Skip to content

Commit 53bbd4a

Browse files
authored
[improve][broker] Reduce unnecessary MessageMetadata parsing by caching the parsed instance in the broker cache (#24682)
1 parent 9715653 commit 53bbd4a

29 files changed

Lines changed: 382 additions & 140 deletions

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.netty.buffer.ByteBuf;
2222
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
2323
import org.apache.bookkeeper.common.annotation.InterfaceStability;
24+
import org.apache.pulsar.common.api.proto.MessageMetadata;
25+
import org.apache.pulsar.common.protocol.Commands;
2426

2527
/**
2628
* An Entry represent a ledger entry data and its associated position.
@@ -98,4 +100,27 @@ default boolean hasExpectedReads() {
98100
default boolean matchesPosition(Position position) {
99101
return position != null && position.compareTo(getLedgerId(), getEntryId()) == 0;
100102
}
103+
104+
default MessageMetadata getMessageMetadata() {
105+
return null;
106+
}
107+
108+
/**
109+
* Returns the timestamp of the entry.
110+
* @return
111+
*/
112+
default long getEntryTimestamp() {
113+
// get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor
114+
return Commands.peekBrokerEntryMetadataToLong(getDataBuffer(), brokerEntryMetadata -> {
115+
if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
116+
return brokerEntryMetadata.getBrokerTimestamp();
117+
}
118+
// otherwise get the publish_time
119+
MessageMetadata messageMetadata = getMessageMetadata();
120+
if (messageMetadata == null) {
121+
messageMetadata = Commands.peekMessageMetadata(getDataBuffer(), null, -1);
122+
}
123+
return messageMetadata.getPublishTime();
124+
});
125+
}
101126
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import io.netty.util.Recycler;
2525
import io.netty.util.Recycler.Handle;
2626
import io.netty.util.ReferenceCounted;
27+
import lombok.Getter;
28+
import lombok.Setter;
29+
import lombok.extern.slf4j.Slf4j;
2730
import org.apache.bookkeeper.client.api.LedgerEntry;
2831
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
2932
import org.apache.bookkeeper.mledger.Entry;
@@ -33,7 +36,10 @@
3336
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
3437
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
3538
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
39+
import org.apache.pulsar.common.api.proto.MessageMetadata;
40+
import org.apache.pulsar.common.protocol.Commands;
3641

42+
@Slf4j
3743
public final class EntryImpl extends AbstractCASReferenceCounted
3844
implements ReferenceCountedEntry, Comparable<EntryImpl> {
3945

@@ -51,6 +57,8 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
5157
ByteBuf data;
5258
private EntryReadCountHandler readCountHandler;
5359
private boolean decreaseReadCountOnRelease = true;
60+
@Getter @Setter
61+
private MessageMetadata messageMetadata;
5462

5563
private Runnable onDeallocate;
5664

@@ -161,6 +169,7 @@ public static EntryImpl create(EntryImpl other) {
161169
entry.entryId = other.entryId;
162170
entry.data = other.data.retainedDuplicate();
163171
entry.readCountHandler = other.readCountHandler;
172+
entry.messageMetadata = other.messageMetadata;
164173
entry.setRefCnt(1);
165174
return entry;
166175
}
@@ -172,6 +181,7 @@ public static EntryImpl create(Entry other) {
172181
entry.entryId = other.getEntryId();
173182
entry.data = other.getDataBuffer().retainedDuplicate();
174183
entry.readCountHandler = other.getReadCountHandler();
184+
entry.messageMetadata = other.getMessageMetadata();
175185
entry.setRefCnt(1);
176186
return entry;
177187
}
@@ -277,6 +287,7 @@ protected void deallocate() {
277287
position = null;
278288
readCountHandler = null;
279289
decreaseReadCountOnRelease = true;
290+
messageMetadata = null;
280291
recyclerHandle.recycle(this);
281292
}
282293

@@ -294,6 +305,19 @@ public void setDecreaseReadCountOnRelease(boolean enabled) {
294305
decreaseReadCountOnRelease = enabled;
295306
}
296307

308+
public void initializeMessageMetadataIfNeeded(String managedLedgerName) {
309+
if (messageMetadata == null) {
310+
try {
311+
MessageMetadata msgMetadata = new MessageMetadata();
312+
Commands.peekMessageMetadata(data, msgMetadata);
313+
this.messageMetadata = msgMetadata;
314+
} catch (Throwable t) {
315+
log.warn("[{}] Failed to parse message metadata for entry {}:{}", managedLedgerName, ledgerId, entryId,
316+
t);
317+
}
318+
}
319+
}
320+
297321
@Override
298322
public String toString() {
299323
return getClass().getName() + "@" + System.identityHashCode(this)

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.netty.buffer.Unpooled;
3232
import io.netty.util.Recycler;
3333
import io.netty.util.Recycler.Handle;
34-
import java.io.IOException;
3534
import java.time.Clock;
3635
import java.util.ArrayList;
3736
import java.util.Collection;
@@ -144,7 +143,6 @@
144143
import org.apache.pulsar.common.policies.data.OffloadPolicies;
145144
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
146145
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
147-
import org.apache.pulsar.common.protocol.Commands;
148146
import org.apache.pulsar.common.util.DateFormatter;
149147
import org.apache.pulsar.common.util.FutureUtil;
150148
import org.apache.pulsar.common.util.LazyLoadableValue;
@@ -1315,9 +1313,9 @@ private CompletableFuture<Long> getEarliestMessagePublishTimeOfPos(Position pos)
13151313
@Override
13161314
public void readEntryComplete(Entry entry, Object ctx) {
13171315
try {
1318-
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
1316+
long entryTimestamp = entry.getEntryTimestamp();
13191317
future.complete(entryTimestamp);
1320-
} catch (IOException e) {
1318+
} catch (Exception e) {
13211319
log.error("Error deserializing message for message position {}", nextPos, e);
13221320
future.completeExceptionally(e);
13231321
} finally {
@@ -4991,4 +4989,9 @@ public void waitForPendingCacheEvictions() {
49914989
throw new RuntimeException(e);
49924990
}
49934991
}
4992+
4993+
boolean shouldCacheAddedEntry() {
4994+
// Avoid caching entries if no cursor has been created
4995+
return getActiveCursors().shouldCacheAddedEntry();
4996+
}
49944997
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -257,23 +257,23 @@ public void run() {
257257
ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
258258
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
259259

260+
// ctx will contain a Position instance only in the case of ShadowManagedLedgerImpl
260261
long ledgerId = ledger != null ? ledger.getId() : ((Position) ctx).getLedgerId();
261-
// Don't insert to the entry cache for the ShadowManagedLedger
262-
if (!(ml instanceof ShadowManagedLedgerImpl)) {
263-
// Avoid caching entries if no cursor has been created
264-
if (ml.getActiveCursors().shouldCacheAddedEntry()) {
265-
int expectedReadCount = 0;
266-
// only use expectedReadCount if cache eviction is enabled by expected read count
267-
if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
268-
expectedReadCount = ml.getActiveCursors().size();
269-
}
270-
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount);
271-
entry.setDecreaseReadCountOnRelease(false);
272-
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
273-
// insert
274-
ml.entryCache.insert(entry);
275-
entry.release();
262+
263+
// Handle caching for tailing reads
264+
if (ml.shouldCacheAddedEntry()) {
265+
int expectedReadCount = 0;
266+
// only use expectedReadCount if cache eviction is enabled by expected read count
267+
if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
268+
// use the number of active cursors as the expected read count
269+
expectedReadCount = ml.getActiveCursors().size();
276270
}
271+
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount);
272+
entry.setDecreaseReadCountOnRelease(false);
273+
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
274+
// insert
275+
ml.entryCache.insert(entry);
276+
entry.release();
277277
}
278278

279279
Position lastEntry = PositionFactory.create(ledgerId, entryId);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,4 +407,9 @@ protected synchronized void updateLedgersIdsComplete(LedgerHandle originalCurren
407407
protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
408408
this.lastLedgerCreatedTimestamp = clock.millis();
409409
}
410+
411+
@Override
412+
boolean shouldCacheAddedEntry() {
413+
return false;
414+
}
410415
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSu
7878
for (LedgerEntry e : ledgerEntries) {
7979
// Insert the entries at the end of the list (they will be unsorted for now)
8080
EntryImpl entry = EntryImpl.create(e, interceptor, 0);
81+
entry.initializeMessageMetadataIfNeeded(ml.getName());
8182
entries.add(entry);
8283
totalSize += entry.getLength();
8384
}
@@ -111,7 +112,7 @@ public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.Read
111112
if (iterator.hasNext()) {
112113
LedgerEntry ledgerEntry = iterator.next();
113114
EntryImpl returnEntry = EntryImpl.create(ledgerEntry, interceptor, 0);
114-
115+
returnEntry.initializeMessageMetadataIfNeeded(ml.getName());
115116
ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength());
116117
ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
117118
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,23 @@
4646
class RangeCache {
4747
private final ConcurrentNavigableMap<Position, RangeCacheEntryWrapper> entries;
4848
private final RangeCacheRemovalQueue removalQueue;
49-
private AtomicLong size; // Total size of values stored in cache
49+
private final AtomicLong size; // Total size of values stored in cache
50+
private final String managedLedgerName;
5051

5152
/**
5253
* Construct a new RangeCache.
5354
*/
5455
public RangeCache(RangeCacheRemovalQueue removalQueue) {
56+
this(removalQueue, null);
57+
}
58+
59+
/**
60+
* Construct a new RangeCache.
61+
* @param managedLedgerName the name of the managed ledger this cache belongs to
62+
*/
63+
public RangeCache(RangeCacheRemovalQueue removalQueue, String managedLedgerName) {
5564
this.removalQueue = removalQueue;
65+
this.managedLedgerName = managedLedgerName;
5666
this.entries = new ConcurrentSkipListMap<>();
5767
this.size = new AtomicLong(0);
5868
}
@@ -115,7 +125,7 @@ private ReferenceCountedEntry getValueFromWrapper(Position key, RangeCacheEntryW
115125
if (valueWrapper == null) {
116126
return null;
117127
} else {
118-
ReferenceCountedEntry value = valueWrapper.getValue(key);
128+
ReferenceCountedEntry value = valueWrapper.getValue(key, managedLedgerName);
119129
return getRetainedValueMatchingKey(key, value);
120130
}
121131
}
@@ -124,7 +134,8 @@ private ReferenceCountedEntry getValueFromWrapper(Position key, RangeCacheEntryW
124134
* @apiNote the returned value must be released if it's not null
125135
*/
126136
private ReferenceCountedEntry getValueMatchingEntry(Map.Entry<Position, RangeCacheEntryWrapper> entry) {
127-
ReferenceCountedEntry valueMatchingEntry = RangeCacheEntryWrapper.getValueMatchingMapEntry(entry);
137+
ReferenceCountedEntry valueMatchingEntry =
138+
RangeCacheEntryWrapper.getValueMatchingMapEntry(entry, managedLedgerName);
128139
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
129140
}
130141

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222
import java.util.Map;
2323
import java.util.concurrent.locks.StampedLock;
2424
import java.util.function.Function;
25+
import lombok.extern.slf4j.Slf4j;
2526
import org.apache.bookkeeper.mledger.Position;
2627
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
28+
import org.apache.bookkeeper.mledger.impl.EntryImpl;
2729

2830
/**
2931
* Wrapper around the value to store in Map. This is needed to ensure that a specific instance can be removed from
3032
* the map by calling the {@link Map#remove(Object, Object)} method. Certain race conditions could result in the
3133
* wrong value being removed from the map. The instances of this class are recycled to avoid creating new objects.
3234
*/
35+
@Slf4j
3336
class RangeCacheEntryWrapper {
3437
private final Recycler.Handle<RangeCacheEntryWrapper> recyclerHandle;
3538
private static final Recycler<RangeCacheEntryWrapper> RECYCLER = new Recycler<RangeCacheEntryWrapper>() {
@@ -73,12 +76,13 @@ static <R> R withNewInstance(RangeCache rangeCache, Position key, ReferenceCount
7376
/**
7477
* Get the value associated with the key. Returns null if the key does not match the key.
7578
*
76-
* @param key the key to match
79+
* @param key the key to match
80+
* @param managedLedgerName
7781
* @return the value associated with the key, or null if the value has already been recycled or the key does not
7882
* match
7983
*/
80-
ReferenceCountedEntry getValue(Position key) {
81-
return getValueInternal(key, false);
84+
ReferenceCountedEntry getValue(Position key, String managedLedgerName) {
85+
return getValueInternal(key, false, managedLedgerName);
8286
}
8387

8488
/**
@@ -88,8 +92,9 @@ ReferenceCountedEntry getValue(Position key) {
8892
* @return the value associated with the key, or null if the value has already been recycled or the key does not
8993
* exactly match the same instance
9094
*/
91-
static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, RangeCacheEntryWrapper> entry) {
92-
return entry.getValue().getValueInternal(entry.getKey(), true);
95+
static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, RangeCacheEntryWrapper> entry,
96+
String managedLedgerName) {
97+
return entry.getValue().getValueInternal(entry.getKey(), true, managedLedgerName);
9398
}
9499

95100
/**
@@ -101,16 +106,20 @@ static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, RangeC
101106
* key as the one stored in the wrapper. This is used to avoid any races
102107
* when retrieving or removing the entries from the cache when the key and value
103108
* instances are available.
109+
* @param managedLedgerName
104110
* @return the value associated with the key, or null if the key does not match
105111
*/
106-
private ReferenceCountedEntry getValueInternal(Position key, boolean requireSameKeyInstance) {
112+
private ReferenceCountedEntry getValueInternal(Position key, boolean requireSameKeyInstance,
113+
String managedLedgerName) {
107114
long stamp = lock.tryOptimisticRead();
108115
Position localKey = this.key;
109116
ReferenceCountedEntry localValue = this.value;
117+
boolean messageMetadataInitialized = localValue != null && localValue.getMessageMetadata() != null;
110118
if (!lock.validate(stamp)) {
111119
stamp = lock.readLock();
112120
localKey = this.key;
113121
localValue = this.value;
122+
messageMetadataInitialized = localValue != null && localValue.getMessageMetadata() != null;
114123
lock.unlockRead(stamp);
115124
}
116125
// check that the given key matches the key associated with the value in the entry
@@ -120,6 +129,19 @@ private ReferenceCountedEntry getValueInternal(Position key, boolean requireSame
120129
if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) {
121130
return null;
122131
}
132+
// Initialize the metadata if it's not already initialized
133+
if (localValue != null && !messageMetadataInitialized) {
134+
localValue = withWriteLock(wrapper -> {
135+
// ensure that the key still matches
136+
if (wrapper.key != key && (requireSameKeyInstance || wrapper.key == null || !wrapper.key.equals(key))) {
137+
return null;
138+
}
139+
if (wrapper.value instanceof EntryImpl entry) {
140+
entry.initializeMessageMetadataIfNeeded(managedLedgerName);
141+
}
142+
return wrapper.value;
143+
});
144+
}
123145
accessed = true;
124146
return localValue;
125147
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, l
532532
final List<Entry> entriesToReturn = new ArrayList<>(entriesToRead);
533533
for (LedgerEntry e : ledgerEntries) {
534534
EntryImpl entry = EntryImpl.create(e, interceptor, expectedReadCountVal);
535+
entry.initializeMessageMetadataIfNeeded(ml.getName());
535536
entriesToReturn.add(entry);
536537
totalSize += entry.getLength();
537538
if (expectedReadCountVal > 0) {

0 commit comments

Comments
 (0)