Skip to content

Commit 2fb9366

Browse files
lifepuzzlefunlifepuzzlefun1
authored andcommitted
[improve][broker] Get lowest PositionImpl from NavigableSet (#18278)
* [cleanup] Direct get lowest PositionImpl from TreeMap change signature from Set<T> to NavigableSet<T> which makes the caller to get lowest PositionImpl more efficient. * change poll to first when call `NavigableSet` * fix check style remove unused import Co-authored-by: wangjinlong <wangjinlong@zhihu.com>
1 parent 43bf682 commit 2fb9366

6 files changed

Lines changed: 27 additions & 20 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.pulsar.broker.delayed;
2020

2121
import com.google.common.annotations.Beta;
22-
import java.util.Set;
22+
import java.util.NavigableSet;
2323
import org.apache.bookkeeper.mledger.impl.PositionImpl;
2424

2525
/**
@@ -53,7 +53,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
5353
/**
5454
* Get a set of position of messages that have already reached the delivery time.
5555
*/
56-
Set<PositionImpl> getScheduledMessages(int maxMessages);
56+
NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
5757

5858
/**
5959
* Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.netty.util.Timer;
2323
import io.netty.util.TimerTask;
2424
import java.time.Clock;
25-
import java.util.Set;
25+
import java.util.NavigableSet;
2626
import java.util.TreeSet;
2727
import java.util.concurrent.TimeUnit;
2828
import lombok.extern.slf4j.Slf4j;
@@ -146,9 +146,9 @@ public boolean hasMessageAvailable() {
146146
* Get a set of position of messages that have already reached.
147147
*/
148148
@Override
149-
public Set<PositionImpl> getScheduledMessages(int maxMessages) {
149+
public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
150150
int n = maxMessages;
151-
Set<PositionImpl> positions = new TreeSet<>();
151+
NavigableSet<PositionImpl> positions = new TreeSet<>();
152152
long cutoffTime = getCutoffTime();
153153

154154
while (n > 0 && !priorityQueue.isEmpty()) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.common.collect.ComparisonChain;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.NavigableSet;
2425
import java.util.Set;
2526
import javax.annotation.concurrent.NotThreadSafe;
2627
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -136,7 +137,7 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
136137
return false;
137138
}
138139

139-
public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
140+
public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
140141
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
141142
}
142143
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Comparator;
2828
import java.util.List;
2929
import java.util.Map;
30+
import java.util.NavigableSet;
3031
import java.util.Optional;
3132
import java.util.Set;
3233
import java.util.concurrent.CompletableFuture;
@@ -278,7 +279,7 @@ public synchronized void readMoreEntries() {
278279
return;
279280
}
280281

281-
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
282+
NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
282283

283284
if (!messagesToReplayNow.isEmpty()) {
284285
if (log.isDebugEnabled()) {
@@ -287,7 +288,7 @@ public synchronized void readMoreEntries() {
287288
}
288289

289290
havePendingReplayRead = true;
290-
minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
291+
minReplayedPosition = messagesToReplayNow.first();
291292
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
292293
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
293294
// clear already acked positions from replay bucket
@@ -311,11 +312,14 @@ public synchronized void readMoreEntries() {
311312
consumerList.size());
312313
}
313314
havePendingRead = true;
314-
Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
315-
minReplayedPosition = toReplay.stream().findFirst().orElse(null);
316-
if (minReplayedPosition != null) {
315+
NavigableSet<PositionImpl> toReplay = getMessagesToReplayNow(1);
316+
if (!toReplay.isEmpty()) {
317+
minReplayedPosition = toReplay.first();
317318
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
319+
} else {
320+
minReplayedPosition = null;
318321
}
322+
319323
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
320324
ReadType.Normal, topic.getMaxReadPosition());
321325
} else {
@@ -1043,17 +1047,17 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
10431047
}
10441048
}
10451049

1046-
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
1050+
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
10471051
if (!redeliveryMessages.isEmpty()) {
10481052
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
10491053
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
10501054
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
1051-
Set<PositionImpl> messagesAvailableNow =
1055+
NavigableSet<PositionImpl> messagesAvailableNow =
10521056
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
10531057
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
10541058
return messagesAvailableNow;
10551059
} else {
1056-
return Collections.emptySet();
1060+
return Collections.emptyNavigableSet();
10571061
}
10581062
}
10591063

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.LinkedHashMap;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.NavigableSet;
3233
import java.util.Set;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.atomic.AtomicInteger;
@@ -184,9 +185,10 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
184185
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
185186
// This may happen when consumer closed. See issue #12885 for details.
186187
if (!allowOutOfOrderDelivery) {
187-
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
188+
NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
188189
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
189-
PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
190+
PositionImpl replayPosition = messagesToReplayNow.first();
191+
190192
// We have received a message potentially from the delayed tracker and, since we're not using it
191193
// right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
192194
// resend it (until we disconnect consumer).
@@ -458,13 +460,13 @@ private boolean removeConsumersFromRecentJoinedConsumers() {
458460
}
459461

460462
@Override
461-
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
463+
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
462464
if (isDispatcherStuckOnReplays) {
463465
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
464466
// messages kicks in), instead of keep replaying the same old messages, since the consumer that these
465467
// messages are routing to might be busy at the moment
466468
this.isDispatcherStuckOnReplays = false;
467-
return Collections.emptySet();
469+
return Collections.emptyNavigableSet();
468470
} else {
469471
return super.getMessagesToReplayNow(maxMessagesToRead);
470472
}

pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Map;
2323
import java.util.NavigableMap;
2424
import java.util.NavigableSet;
25-
import java.util.Set;
2625
import java.util.TreeMap;
2726
import java.util.TreeSet;
2827
import java.util.concurrent.locks.ReadWriteLock;
@@ -95,7 +94,8 @@ public void removeUpTo(long item1, long item2) {
9594
}
9695

9796

98-
public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
97+
public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems,
98+
LongPairSet.LongPairFunction<T> longPairConverter) {
9999
NavigableSet<T> items = new TreeSet<>();
100100
lock.readLock().lock();
101101
try {

0 commit comments

Comments
 (0)