Skip to content

Commit 6751ad9

Browse files
committed
[improve][broker] Extract pending ack value packing
1 parent 31e1d39 commit 6751ad9

3 files changed

Lines changed: 149 additions & 44 deletions

File tree

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import it.unimi.dsi.fastutil.ints.IntIntPair;
22+
23+
final class PendingAckValue {
24+
static final int NOT_FOUND = -1;
25+
static final long PACKED_NOT_FOUND = packUnchecked(NOT_FOUND, 0);
26+
27+
private static final long STICKY_KEY_HASH_MASK = 0xFFFF_FFFFL;
28+
29+
private PendingAckValue() {
30+
}
31+
32+
static long pack(int remainingUnacked, int stickyKeyHash) {
33+
if (remainingUnacked < 0) {
34+
throw new IllegalArgumentException("remainingUnacked must be non-negative");
35+
}
36+
return packUnchecked(remainingUnacked, stickyKeyHash);
37+
}
38+
39+
static boolean isNotFound(long packedValue) {
40+
return packedValue == PACKED_NOT_FOUND;
41+
}
42+
43+
static IntIntPair toPair(long packedValue) {
44+
return IntIntPair.of(remainingUnacked(packedValue), stickyKeyHash(packedValue));
45+
}
46+
47+
static int remainingUnacked(long packedValue) {
48+
return (int) (packedValue >> Integer.SIZE);
49+
}
50+
51+
static int stickyKeyHash(long packedValue) {
52+
return (int) packedValue;
53+
}
54+
55+
private static long packUnchecked(int remainingUnacked, int stickyKeyHash) {
56+
return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & STICKY_KEY_HASH_MASK);
57+
}
58+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,7 @@
4343
* running.
4444
*/
4545
public class PendingAcksMap {
46-
static final int PENDING_ACK_NOT_FOUND = -1;
47-
private static final long STICKY_KEY_HASH_MASK = 0xFFFF_FFFFL;
48-
// The remaining unacked count is a non-negative message count. Reserve a packed negative count as the
49-
// Long2LongSortedMap default return value so lookups can distinguish missing entries with a single get.
50-
private static final long PACKED_PENDING_ACK_NOT_FOUND =
51-
packPendingAckValue(PENDING_ACK_NOT_FOUND, 0);
46+
static final int PENDING_ACK_NOT_FOUND = PendingAckValue.NOT_FOUND;
5247

5348
/**
5449
* Callback interface for handling the addition of pending acknowledgments.
@@ -141,7 +136,6 @@ public interface PendingAcksConsumer {
141136
* @return true if the pending ack was added, and it's allowed to send a message, false otherwise
142137
*/
143138
public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) {
144-
long packedValue = packPendingAckValue(remainingUnacked, stickyKeyHash);
145139
try {
146140
writeLock.lock();
147141
// prevent adding sticky hash to pending acks if the PendingAcksMap has already been closed
@@ -158,8 +152,9 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining
158152
}
159153
Long2LongSortedMap ledgerPendingAcks =
160154
pendingAcks.computeIfAbsent(ledgerId, k -> newLedgerPendingAcks());
155+
long packedValue = PendingAckValue.pack(remainingUnacked, stickyKeyHash);
161156
long previous = ledgerPendingAcks.put(entryId, packedValue);
162-
if (previous == PACKED_PENDING_ACK_NOT_FOUND) {
157+
if (PendingAckValue.isNotFound(previous)) {
163158
size++;
164159
}
165160
return true;
@@ -202,8 +197,8 @@ private void processPendingAcks(PendingAcksConsumer processor) {
202197
for (Long2LongMap.Entry e : ledgerPendingAcks.long2LongEntrySet()) {
203198
long entryId = e.getLongKey();
204199
long packedValue = e.getLongValue();
205-
processor.accept(ledgerId, entryId, unpackRemainingUnacked(packedValue),
206-
unpackStickyKeyHash(packedValue));
200+
processor.accept(ledgerId, entryId, PendingAckValue.remainingUnacked(packedValue),
201+
PendingAckValue.stickyKeyHash(packedValue));
207202
}
208203
}
209204
}
@@ -294,7 +289,7 @@ public IntIntPair get(long ledgerId, long entryId) {
294289
return null;
295290
}
296291
long packedValue = ledgerMap.get(entryId);
297-
return packedValue == PACKED_PENDING_ACK_NOT_FOUND ? null : unpackPendingAckValue(packedValue);
292+
return PendingAckValue.isNotFound(packedValue) ? null : PendingAckValue.toPair(packedValue);
298293
} finally {
299294
readLock.unlock();
300295
}
@@ -313,8 +308,8 @@ int getRemainingUnacked(long ledgerId, long entryId) {
313308
return PENDING_ACK_NOT_FOUND;
314309
}
315310
long packedValue = ledgerMap.get(entryId);
316-
return packedValue == PACKED_PENDING_ACK_NOT_FOUND
317-
? PENDING_ACK_NOT_FOUND : unpackRemainingUnacked(packedValue);
311+
return PendingAckValue.isNotFound(packedValue)
312+
? PENDING_ACK_NOT_FOUND : PendingAckValue.remainingUnacked(packedValue);
318313
} finally {
319314
readLock.unlock();
320315
}
@@ -333,7 +328,10 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH
333328
try {
334329
writeLock.lock();
335330
Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId);
336-
long expectedValue = packPendingAckValue(batchSize, stickyKeyHash);
331+
if (batchSize < 0) {
332+
return false;
333+
}
334+
long expectedValue = PendingAckValue.pack(batchSize, stickyKeyHash);
337335
if (ledgerMap == null || ledgerMap.get(entryId) != expectedValue) {
338336
return false;
339337
}
@@ -366,14 +364,14 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt
366364
return false;
367365
}
368366
long packedValue = ledgerMap.get(entryId);
369-
if (packedValue == PACKED_PENDING_ACK_NOT_FOUND) {
367+
if (PendingAckValue.isNotFound(packedValue)) {
370368
return false;
371369
}
372-
int newRemaining = unpackRemainingUnacked(packedValue) - ackedDelta;
370+
int newRemaining = PendingAckValue.remainingUnacked(packedValue) - ackedDelta;
373371
if (newRemaining < 0) {
374372
return false;
375373
}
376-
ledgerMap.put(entryId, packPendingAckValue(newRemaining, unpackStickyKeyHash(packedValue)));
374+
ledgerMap.put(entryId, PendingAckValue.pack(newRemaining, PendingAckValue.stickyKeyHash(packedValue)));
377375
return true;
378376
} finally {
379377
writeLock.unlock();
@@ -395,11 +393,11 @@ public boolean remove(long ledgerId, long entryId) {
395393
return false;
396394
}
397395
long removedEntry = ledgerMap.remove(entryId);
398-
if (removedEntry == PACKED_PENDING_ACK_NOT_FOUND) {
396+
if (PendingAckValue.isNotFound(removedEntry)) {
399397
return false;
400398
}
401399
size--;
402-
handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry));
400+
handleRemovePendingAck(ledgerId, entryId, PendingAckValue.stickyKeyHash(removedEntry));
403401
if (ledgerMap.isEmpty()) {
404402
pendingAcks.remove(ledgerId);
405403
}
@@ -429,15 +427,15 @@ public IntIntPair removeAndGet(long ledgerId, long entryId) {
429427
return null;
430428
}
431429
long removedEntry = ledgerMap.remove(entryId);
432-
if (removedEntry == PACKED_PENDING_ACK_NOT_FOUND) {
430+
if (PendingAckValue.isNotFound(removedEntry)) {
433431
return null;
434432
}
435433
size--;
436-
handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry));
434+
handleRemovePendingAck(ledgerId, entryId, PendingAckValue.stickyKeyHash(removedEntry));
437435
if (ledgerMap.isEmpty()) {
438436
pendingAcks.remove(ledgerId);
439437
}
440-
return unpackPendingAckValue(removedEntry);
438+
return PendingAckValue.toPair(removedEntry);
441439
} finally {
442440
writeLock.unlock();
443441
}
@@ -456,15 +454,15 @@ int removeAndGetRemainingUnacked(long ledgerId, long entryId) {
456454
return PENDING_ACK_NOT_FOUND;
457455
}
458456
long removedEntry = ledgerMap.remove(entryId);
459-
if (removedEntry == PACKED_PENDING_ACK_NOT_FOUND) {
457+
if (PendingAckValue.isNotFound(removedEntry)) {
460458
return PENDING_ACK_NOT_FOUND;
461459
}
462460
size--;
463-
handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry));
461+
handleRemovePendingAck(ledgerId, entryId, PendingAckValue.stickyKeyHash(removedEntry));
464462
if (ledgerMap.isEmpty()) {
465463
pendingAcks.remove(ledgerId);
466464
}
467-
return unpackRemainingUnacked(removedEntry);
465+
return PendingAckValue.remainingUnacked(removedEntry);
468466
} finally {
469467
writeLock.unlock();
470468
}
@@ -533,8 +531,8 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
533531
return;
534532
}
535533
long packedValue = pendingAckEntry.getLongValue();
536-
int batchSize = unpackRemainingUnacked(packedValue);
537-
int stickyKeyHash = unpackStickyKeyHash(packedValue);
534+
int batchSize = PendingAckValue.remainingUnacked(packedValue);
535+
int stickyKeyHash = PendingAckValue.stickyKeyHash(packedValue);
538536
if (pendingAcksRemoveHandler != null) {
539537
if (!batchStarted) {
540538
pendingAcksRemoveHandler.startBatch();
@@ -571,28 +569,12 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
571569
}
572570
}
573571

574-
private static long packPendingAckValue(int remainingUnacked, int stickyKeyHash) {
575-
return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & STICKY_KEY_HASH_MASK);
576-
}
577-
578572
private static Long2LongSortedMap newLedgerPendingAcks() {
579573
Long2LongRBTreeMap ledgerPendingAcks = new Long2LongRBTreeMap();
580-
ledgerPendingAcks.defaultReturnValue(PACKED_PENDING_ACK_NOT_FOUND);
574+
ledgerPendingAcks.defaultReturnValue(PendingAckValue.PACKED_NOT_FOUND);
581575
return ledgerPendingAcks;
582576
}
583577

584-
private static IntIntPair unpackPendingAckValue(long packedValue) {
585-
return IntIntPair.of(unpackRemainingUnacked(packedValue), unpackStickyKeyHash(packedValue));
586-
}
587-
588-
private static int unpackRemainingUnacked(long packedValue) {
589-
return (int) (packedValue >> Integer.SIZE);
590-
}
591-
592-
private static int unpackStickyKeyHash(long packedValue) {
593-
return (int) packedValue;
594-
}
595-
596578
private void handleRemovePendingAck(long ledgerId, long entryId, int stickyKeyHash) {
597579
PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get();
598580
if (pendingAcksRemoveHandler != null) {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertThrows;
24+
import static org.testng.Assert.assertTrue;
25+
import it.unimi.dsi.fastutil.ints.IntIntPair;
26+
import org.testng.annotations.Test;
27+
28+
public class PendingAckValueTest {
29+
@Test
30+
public void pack_RoundTripsRemainingUnackedAndStickyKeyHash() {
31+
int[][] values = new int[][] {
32+
{0, 0},
33+
{1, -1},
34+
{Integer.MAX_VALUE, Integer.MIN_VALUE},
35+
{42, Integer.MAX_VALUE},
36+
{1_000_000, -123456789}
37+
};
38+
39+
for (int[] value : values) {
40+
long packedValue = PendingAckValue.pack(value[0], value[1]);
41+
42+
assertFalse(PendingAckValue.isNotFound(packedValue));
43+
assertEquals(PendingAckValue.remainingUnacked(packedValue), value[0]);
44+
assertEquals(PendingAckValue.stickyKeyHash(packedValue), value[1]);
45+
46+
IntIntPair pair = PendingAckValue.toPair(packedValue);
47+
assertEquals(pair.leftInt(), value[0]);
48+
assertEquals(pair.rightInt(), value[1]);
49+
}
50+
}
51+
52+
@Test
53+
public void packedNotFound_UsesReservedNegativeRemainingUnackedValue() {
54+
assertTrue(PendingAckValue.isNotFound(PendingAckValue.PACKED_NOT_FOUND));
55+
assertEquals(PendingAckValue.remainingUnacked(PendingAckValue.PACKED_NOT_FOUND),
56+
PendingAckValue.NOT_FOUND);
57+
assertEquals(PendingAckValue.stickyKeyHash(PendingAckValue.PACKED_NOT_FOUND), 0);
58+
assertFalse(PendingAckValue.isNotFound(PendingAckValue.pack(0, 0)));
59+
}
60+
61+
@Test
62+
public void pack_RejectsNegativeRemainingUnacked() {
63+
assertThrows(IllegalArgumentException.class, () -> PendingAckValue.pack(-1, 0));
64+
}
65+
}

0 commit comments

Comments
 (0)