diff --git a/.gitignore b/.gitignore
index 42309e4d62b..e83db003eed 100644
--- a/.gitignore
+++ b/.gitignore
@@ -50,4 +50,4 @@ build/
*.dat
# Downloaded released BookKeeper versions (cached by CI, not committed)
-.released-versions/
\ No newline at end of file
+.released-versions/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 90c8acf5af4..b8613fb76be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -57,6 +57,16 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte
// TODO: Shouldn't this be async?
ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException, BookieException;
+
+ /**
+ * Read a ledger entry only when it can fit the provided bound.
+ *
+ *
{@code maxEntrySize} includes the 4-byte per-entry delimiter used by batched-read response framing.
+ * Implementations return {@code null} when the entry exists but {@code entry.readableBytes() + 4}
+ * exceeds {@code maxEntrySize}.
+ */
+ ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
+ throws IOException, NoLedgerException, BookieException;
long readLastAddConfirmed(long ledgerId) throws IOException, BookieException;
PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException;
@@ -127,4 +137,4 @@ public long getEntry() {
}
}
-}
\ No newline at end of file
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 111f5134e78..359f92c0425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -1166,6 +1166,36 @@ public ByteBuf readEntry(long ledgerId, long entryId)
}
}
+ @Override
+ public ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
+ throws IOException, NoLedgerException, BookieException {
+ long requestNanos = MathUtils.nowInNano();
+ boolean success = false;
+ int entrySize = 0;
+ try {
+ LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reading {}@{} with maxEntrySize {}", entryId, ledgerId, maxEntrySize);
+ }
+ ByteBuf entry = handle.readEntryIfFits(entryId, maxEntrySize);
+ if (entry != null) {
+ entrySize = entry.readableBytes();
+ bookieStats.getReadBytes().addCount(entrySize);
+ }
+ success = true;
+ return entry;
+ } finally {
+ long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
+ if (success) {
+ bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
+ bookieStats.getReadBytesStats().registerSuccessfulValue(entrySize);
+ } else {
+ bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
+ bookieStats.getReadBytesStats().registerFailedValue(entrySize);
+ }
+ }
+ }
+
public long readLastAddConfirmed(long ledgerId) throws IOException, BookieException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
return handle.getLastAddConfirmed();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
index 2696aee3c94..e26c3660bfb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
@@ -836,6 +836,40 @@ public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryExcept
return internalReadEntry(-1L, -1L, location, false /* validateEntry */);
}
+ @Override
+ public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
+ throws IOException, Bookie.NoEntryException {
+ long entryLogId = logIdForOffset(entryLocation);
+ long pos = posForOffset(entryLocation);
+
+ BufferedReadChannel fc = null;
+ int entrySize;
+ try {
+ fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+
+ ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
+ entrySize = sizeBuff.getInt(0);
+ if (entrySize + Integer.BYTES > maxEntrySize) {
+ return null;
+ }
+ validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);
+ } catch (EntryLookupException e) {
+ throw new IOException("Bad entry read from log file id: " + entryLogId, e);
+ }
+
+ ByteBuf data = allocator.buffer(entrySize, entrySize);
+ int rc = readFromLogChannel(entryLogId, fc, data, pos);
+ if (rc != entrySize) {
+ ReferenceCountUtil.release(data);
+ throw new IOException("Bad entry read from log file id: " + entryLogId,
+ new EntryLookupException("Short read for " + ledgerId + "@"
+ + entryId + " in " + entryLogId + "@"
+ + pos + "(" + rc + "!=" + entrySize + ")"));
+ }
+ data.writerIndex(entrySize);
+ return data;
+ }
+
private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
throws IOException, Bookie.NoEntryException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 4c6b7a9ee4d..550c2393544 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -446,6 +446,44 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
}
}
+ @Override
+ public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ entryId = ledgerCache.getLastEntry(ledgerId);
+ }
+
+ long offset;
+ long startTimeNanos = MathUtils.nowInNano();
+ boolean success = false;
+ try {
+ offset = ledgerCache.getEntryOffset(ledgerId, entryId);
+ if (offset == 0) {
+ throw new Bookie.NoEntryException(ledgerId, entryId);
+ }
+ success = true;
+ } finally {
+ if (success) {
+ getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ } else {
+ getOffsetStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ }
+ }
+
+ startTimeNanos = MathUtils.nowInNano();
+ success = false;
+ try {
+ ByteBuf entry = entryLogger.readEntryIfFits(ledgerId, entryId, offset, maxEntrySize);
+ success = true;
+ return entry;
+ } finally {
+ if (success) {
+ getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ } else {
+ getEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ }
+ }
+ }
+
private void flushOrCheckpoint(boolean isCheckpointFlush)
throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 3c84feb876d..cc118c2d98e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -78,6 +78,21 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
abstract long addEntry(ByteBuf entry) throws IOException, BookieException;
abstract ByteBuf readEntry(long entryId) throws IOException, BookieException;
+ /**
+ * Read an entry only when it fits within {@code maxEntrySize}.
+ *
+ *
{@code maxEntrySize} includes the 4-byte per-entry delimiter used in batched-read response framing,
+ * so an exact fit is {@code entry.readableBytes() + 4 == maxEntrySize}.
+ */
+ ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException {
+ ByteBuf entry = readEntry(entryId);
+ if (entry.readableBytes() + 4 > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
abstract long getLastAddConfirmed() throws IOException, BookieException;
abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher watcher)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 00edfb6a9c2..fe2d483566d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -161,6 +161,11 @@ ByteBuf readEntry(long entryId) throws IOException, BookieException {
return ledgerStorage.getEntry(ledgerId, entryId);
}
+ @Override
+ ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException {
+ return ledgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
+ }
+
@Override
long getLastAddConfirmed() throws IOException, BookieException {
return ledgerStorage.getLastAddConfirmed(ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 6eca6e00108..ff18638a63e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -144,6 +144,21 @@ void initialize(ServerConfiguration conf,
*/
ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException;
+ /**
+ * Read an entry from storage only if its serialized size, including the
+ * 4-byte per-entry framing delimiter, is less than or equal to maxEntrySize.
+ *
+ * Returns {@code null} when the entry exists but does not fit the supplied budget.
+ */
+ default ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
+ ByteBuf entry = getEntry(ledgerId, entryId);
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
/**
* Get last add confirmed.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 689668eb938..6b7fdd404e8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -239,6 +239,38 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
return buffToRet;
}
+ @Override
+ public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ EntryKeyValue kv = memTable.getLastEntry(ledgerId);
+ if (kv != null) {
+ ByteBuf entry = kv.getValueAsByteBuffer();
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+ return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
+ }
+
+ try {
+ return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
+ } catch (Bookie.NoEntryException nee) {
+ EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
+ if (kv == null) {
+ return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
+ }
+
+ ByteBuf entry = kv.getValueAsByteBuffer();
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+ }
+
@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
index c8d127c96ba..b71ab17ff50 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
@@ -67,6 +67,22 @@ ByteBuf readEntry(long entryLocation)
ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
throws IOException, NoEntryException;
+ /**
+ * Read an entry only if its serialized size, including the 4-byte per-entry
+ * framing delimiter, is less than or equal to maxEntrySize.
+ *
+ *
Returns {@code null} when the entry exists but does not fit the supplied budget.
+ */
+ default ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
+ throws IOException, NoEntryException {
+ ByteBuf entry = readEntry(ledgerId, entryId, entryLocation);
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
/**
* Flush any outstanding writes to disk.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
index 035981514e9..a4dcd08b8ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
@@ -55,6 +55,7 @@
import org.apache.bookkeeper.bookie.storage.EntryLogIds;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
+import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -211,6 +212,41 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
return internalReadEntry(ledgerId, entryId, entryLocation, true);
}
+ @Override
+ public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
+ throws IOException, NoEntryException {
+ int logId = (int) (entryLocation >> 32);
+ int pos = (int) (entryLocation & 0xFFFFFFFF);
+
+ long start = System.nanoTime();
+ LogReader reader = getReader(logId);
+
+ try {
+ int entrySize = reader.readEntrySizeAt(pos);
+ if (entrySize + Integer.BYTES > maxEntrySize) {
+ stats.getReadEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+ return null;
+ }
+ long thisLedgerId = reader.readLongAt(pos);
+ long thisEntryId = reader.readLongAt(pos + Long.BYTES);
+ if (thisLedgerId != ledgerId || thisEntryId != entryId) {
+ throw new IOException(
+ exMsg("Bad location").kv("location", entryLocation)
+ .kv("expectedLedger", ledgerId).kv("expectedEntry", entryId)
+ .kv("foundLedger", thisLedgerId).kv("foundEntry", thisEntryId)
+ .toString());
+ }
+ ByteBuf buf = reader.readBufferAt(pos, entrySize);
+ stats.getReadEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+ return buf;
+ } catch (EOFException eof) {
+ stats.getReadEntryStats().registerFailedEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+ throw new NoEntryException(
+ exMsg("Entry location doesn't exist").kv("location", entryLocation).toString(),
+ ledgerId, entryId);
+ }
+ }
+
private LogReader getReader(int logId) throws IOException {
Cache cache = caches.get();
try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
index 707bf307c05..4ccd6aa9ee3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
@@ -162,7 +162,7 @@ private int readBytesIntoBuf(ByteBuf buf, long offset, int size) throws IOExcept
}
@Override
- public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
+ public int readEntrySizeAt(int offset) throws IOException, EOFException {
assertValidEntryOffset(offset);
int sizeOffset = offset - Integer.BYTES;
if (sizeOffset < 0) {
@@ -188,6 +188,12 @@ public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
.kv("maxSaneEntrySize", maxSaneEntrySize)
.kv("readEntrySize", entrySize).toString());
}
+ return entrySize;
+ }
+
+ @Override
+ public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
+ int entrySize = readEntrySizeAt(offset);
return readBufferAt(offset, entrySize);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java
index 9f865d55699..e37c1606065 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java
@@ -63,6 +63,12 @@ public interface LogReader extends AutoCloseable {
*/
long readLongAt(long offset) throws IOException, EOFException;
+ /**
+ * Read the size of an entry at a given offset.
+ * The size is stored at {@code offset - Integer.BYTES}.
+ */
+ int readEntrySizeAt(int offset) throws IOException, EOFException;
+
/**
* Read an entry at a given offset.
* The size of the entry must be at (offset - Integer.BYTES).
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 69964c8f81f..c163f0aa9b2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -426,6 +426,11 @@ public long getLocation(long ledgerId, long entryId) throws IOException {
return getLedgerStorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, entryId);
}
+ @Override
+ public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
+ return getLedgerStorage(ledgerId).getEntryIfFits(ledgerId, entryId, maxEntrySize);
+ }
+
private SingleDirectoryDbLedgerStorage getLedgerStorage(long ledgerId) {
return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs));
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 774d10c158f..1977cf828a5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -581,6 +581,18 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
}
}
+ public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
+ long startTime = MathUtils.nowInNano();
+ try {
+ ByteBuf entry = doGetEntryIfFits(ledgerId, entryId, maxEntrySize);
+ recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
+ return entry;
+ } catch (IOException e) {
+ recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
+ throw e;
+ }
+ }
+
private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, BookieException {
if (log.isDebugEnabled()) {
log.debug("Get Entry: {}@{}", ledgerId, entryId);
@@ -665,6 +677,100 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book
return entry;
}
+ private ByteBuf doGetEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
+ throws IOException, BookieException {
+ if (log.isDebugEnabled()) {
+ log.debug("Get Entry If Fits: {}@{} maxEntrySize={}", ledgerId, entryId, maxEntrySize);
+ }
+
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ ByteBuf entry = getLastEntry(ledgerId);
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
+ long stamp = writeCacheRotationLock.tryOptimisticRead();
+ WriteCache localWriteCache = writeCache;
+ WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+ if (!writeCacheRotationLock.validate(stamp)) {
+ stamp = writeCacheRotationLock.readLock();
+ try {
+ localWriteCache = writeCache;
+ localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+ } finally {
+ writeCacheRotationLock.unlockRead(stamp);
+ }
+ }
+
+ ByteBuf entry = localWriteCache.get(ledgerId, entryId);
+ if (entry != null) {
+ dbLedgerStorageStats.getWriteCacheHitCounter().inc();
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
+ entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
+ if (entry != null) {
+ dbLedgerStorageStats.getWriteCacheHitCounter().inc();
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
+ dbLedgerStorageStats.getWriteCacheMissCounter().inc();
+
+ entry = readCache.get(ledgerId, entryId);
+ if (entry != null) {
+ dbLedgerStorageStats.getReadCacheHitCounter().inc();
+ if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
+ entry.release();
+ return null;
+ }
+ return entry;
+ }
+
+ dbLedgerStorageStats.getReadCacheMissCounter().inc();
+
+ long entryLocation;
+ long locationIndexStartNano = MathUtils.nowInNano();
+ try {
+ entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+ if (entryLocation == 0) {
+ // Only a negative result while in limbo equates to unknown
+ throwIfLimbo(ledgerId);
+
+ throw new NoEntryException(ledgerId, entryId);
+ }
+ } finally {
+ dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency(
+ MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS);
+ }
+
+ long readEntryStartNano = MathUtils.nowInNano();
+ try {
+ entry = entryLogger.readEntryIfFits(ledgerId, entryId, entryLocation, maxEntrySize);
+ } finally {
+ dbLedgerStorageStats.getReadFromEntryLogTime().addLatency(
+ MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS);
+ }
+
+ if (entry == null) {
+ return null;
+ }
+
+ readCache.put(ledgerId, entryId, entry);
+
+ return entry;
+ }
+
private void fillReadAheadCache(long originalLedgerId, long firstEntryId, long firstEntryLocation) {
long readAheadStartNano = MathUtils.nowInNano();
int count = 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
index 6db3e143519..c4a7072bf82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
@@ -24,6 +24,7 @@
import io.netty.util.Recycler;
import io.netty.util.ReferenceCounted;
import java.util.concurrent.ExecutorService;
+import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.proto.BookieProtocol.BatchedReadRequest;
import org.apache.bookkeeper.util.ByteBufList;
@@ -59,22 +60,33 @@ protected ReferenceCounted readData() throws Exception {
long frameSize = 24 + 8 + 4;
for (int i = 0; i < maxCount; i++) {
try {
- ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i);
- frameSize += entry.readableBytes() + 4;
if (data == null) {
+ ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId());
+ frameSize += entry.readableBytes() + 4;
data = ByteBufList.get(entry);
} else {
- if (frameSize > maxSize) {
- entry.release();
+ long remainingEntrySize = maxSize - frameSize;
+ if (remainingEntrySize <= 0) {
break;
}
+ ByteBuf entry = requestProcessor.getBookie().readEntryIfFits(
+ request.getLedgerId(), request.getEntryId() + i, remainingEntrySize);
+ if (entry == null) {
+ break;
+ }
+ frameSize += entry.readableBytes() + 4;
data.add(entry);
}
- } catch (Throwable e) {
+ } catch (Bookie.NoEntryException e) {
if (data == null) {
throw e;
}
break;
+ } catch (Throwable e) {
+ if (data != null) {
+ data.release();
+ }
+ throw e;
}
}
return data;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
index 3f14a33d53f..dc459b53b02 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
@@ -391,6 +391,26 @@ public void testRecoverFromLedgersMap() throws Exception {
assertEquals(120, meta.getRemainingSize());
}
+ @Test
+ public void testReadEntryIfFitsHonorsFramingBudget() throws Exception {
+ long ledgerId = 1L;
+ long entryId = 1L;
+ ByteBuf entry = makeEntry(ledgerId, entryId, 128);
+ long location = entryLogger.addEntry(ledgerId, entry.slice());
+ entryLogger.flush();
+
+ assertNull(entryLogger.readEntryIfFits(ledgerId, entryId, location, entry.readableBytes() + 3));
+
+ ByteBuf actual = entryLogger.readEntryIfFits(ledgerId, entryId, location, entry.readableBytes() + 4);
+ try {
+ assertNotNull(actual);
+ assertEntryEquals(actual, entry);
+ } finally {
+ ReferenceCountUtil.release(actual);
+ ReferenceCountUtil.release(entry);
+ }
+ }
+
@Test
public void testLedgersMapIsEmpty() throws Exception {
// create some entries
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index 9c75f9c3ad7..fc49b60d133 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -24,6 +24,8 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
@@ -31,6 +33,7 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -46,6 +49,7 @@
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
@@ -110,6 +114,8 @@ void accept(long ledgerId,
long pos);
}
volatile CheckEntryListener testPoint;
+ volatile int readEntryCalls;
+ volatile int readEntryIfFitsCalls;
public TestableDefaultEntryLogger(
ServerConfiguration conf,
@@ -123,6 +129,11 @@ void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedExce
this.testPoint = testPoint;
}
+ void resetReadTracking() {
+ readEntryCalls = 0;
+ readEntryIfFitsCalls = 0;
+ }
+
@Override
void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
CheckEntryListener runBefore = testPoint;
@@ -131,6 +142,20 @@ void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupEx
}
super.checkEntry(ledgerId, entryId, location);
}
+
+ @Override
+ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
+ throws IOException, Bookie.NoEntryException {
+ readEntryCalls++;
+ return super.readEntry(ledgerId, entryId, entryLocation);
+ }
+
+ @Override
+ public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
+ throws IOException, Bookie.NoEntryException {
+ readEntryIfFitsCalls++;
+ return super.readEntryIfFits(ledgerId, entryId, entryLocation, maxEntrySize);
+ }
}
TestStatsProvider statsProvider = new TestStatsProvider();
@@ -244,6 +269,66 @@ public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException {
}
}
+ @Test
+ public void testGetEntryIfFitsUsesBoundedLoggerRead() throws Exception {
+ long ledgerId = 0L;
+ long entryId = 0L;
+ long exactFitSize = Long.BYTES * 2 + "entry-0".getBytes(StandardCharsets.UTF_8).length + Integer.BYTES;
+
+ entryLogger.resetReadTracking();
+ ByteBuf oversized = interleavedStorage.getEntryIfFits(ledgerId, entryId, exactFitSize - 1);
+ assertNull(oversized);
+ assertEquals(0, entryLogger.readEntryCalls);
+ assertEquals(1, entryLogger.readEntryIfFitsCalls);
+
+ entryLogger.resetReadTracking();
+ ByteBuf exactFit = interleavedStorage.getEntryIfFits(ledgerId, entryId, exactFitSize);
+ try {
+ assertNotNull(exactFit);
+ assertEquals(ledgerId, exactFit.getLong(0));
+ assertEquals(entryId, exactFit.getLong(Long.BYTES));
+ assertEquals("entry-0", exactFit.toString(Long.BYTES * 2,
+ exactFit.readableBytes() - (Long.BYTES * 2), StandardCharsets.UTF_8));
+ assertEquals(0, entryLogger.readEntryCalls);
+ assertEquals(1, entryLogger.readEntryIfFitsCalls);
+ } finally {
+ if (exactFit != null) {
+ exactFit.release();
+ }
+ }
+ }
+
+ @Test
+ public void testGetEntryIfFitsHonorsBudgetForLastAddConfirmed() throws Exception {
+ long ledgerId = 0L;
+ ByteBuf lastEntry = interleavedStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+ long exactFitSize = lastEntry.readableBytes() + Integer.BYTES;
+ long lastEntryId = lastEntry.getLong(Long.BYTES);
+ lastEntry.release();
+
+ entryLogger.resetReadTracking();
+ ByteBuf oversized = interleavedStorage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ exactFitSize - 1);
+ assertNull(oversized);
+ assertEquals(0, entryLogger.readEntryCalls);
+ assertEquals(1, entryLogger.readEntryIfFitsCalls);
+
+ entryLogger.resetReadTracking();
+ ByteBuf exactFit = interleavedStorage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ exactFitSize);
+ try {
+ assertNotNull(exactFit);
+ assertEquals(ledgerId, exactFit.getLong(0));
+ assertEquals(lastEntryId, exactFit.getLong(Long.BYTES));
+ assertEquals(0, entryLogger.readEntryCalls);
+ assertEquals(1, entryLogger.readEntryIfFitsCalls);
+ } finally {
+ if (exactFit != null) {
+ exactFit.release();
+ }
+ }
+ }
+
@Test
public void testConsistencyCheckConcurrentGC() throws Exception {
final long signalDone = -1;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
index db83f096d95..3885451e99a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
@@ -23,6 +23,8 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
@@ -38,12 +40,14 @@
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.DiskChecker;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
/**
* Testing SortedLedgerStorage.
@@ -191,4 +195,47 @@ public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException {
}));
}
}
+
+ @Test
+ public void testGetEntryIfFitsHonorsBudgetForLastAddConfirmed() throws Exception {
+ long ledgerId = 0L;
+ long lastEntryId = 1234L;
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(lastEntryId);
+ entry.writeBytes("lac-entry".getBytes());
+
+ byte[] bytes = new byte[entry.readableBytes()];
+ entry.getBytes(entry.readerIndex(), bytes);
+
+ InterleavedLedgerStorage interleaved = Mockito.mock(InterleavedLedgerStorage.class);
+ SortedLedgerStorage storage = new SortedLedgerStorage(interleaved);
+ storage.memTable = Mockito.mock(EntryMemTable.class);
+ Mockito.when(storage.memTable.getLastEntry(ledgerId))
+ .thenReturn(new EntryKeyValue(ledgerId, lastEntryId, bytes));
+
+ long exactFitSize = entry.readableBytes() + Integer.BYTES;
+ try {
+ ByteBuf oversized = storage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ exactFitSize - 1);
+ assertNull(oversized);
+
+ ByteBuf exactFit = storage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ exactFitSize);
+ try {
+ assertNotNull(exactFit);
+ assertEquals(ledgerId, exactFit.getLong(0));
+ assertEquals(lastEntryId, exactFit.getLong(Long.BYTES));
+ Mockito.verifyNoInteractions(interleaved);
+ } finally {
+ if (exactFit != null) {
+ exactFit.release();
+ }
+ }
+ } finally {
+ if (entry != null) {
+ entry.release();
+ }
+ }
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index dfc2459678b..7b17c9a2c08 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -22,6 +22,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -35,8 +36,10 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
@@ -49,6 +52,8 @@
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -70,6 +75,134 @@ public class DbLedgerStorageTest {
protected LedgerDirsManager ledgerDirsManager;
protected ServerConfiguration conf;
+ protected static class TrackingDbLedgerStorage extends DbLedgerStorage {
+ static final AtomicInteger READ_ENTRY_CALLS = new AtomicInteger();
+ static final AtomicInteger READ_ENTRY_IF_FITS_CALLS = new AtomicInteger();
+
+ static void resetReadTracking() {
+ READ_ENTRY_CALLS.set(0);
+ READ_ENTRY_IF_FITS_CALLS.set(0);
+ }
+
+ @Override
+ protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+ org.apache.bookkeeper.meta.LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ EntryLogger entryLogger,
+ org.apache.bookkeeper.stats.StatsLogger statsLogger,
+ long writeCacheSize,
+ long readCacheSize,
+ int readAheadCacheBatchSize,
+ long readAheadCacheBatchBytesSize) throws IOException {
+ return new SingleDirectoryDbLedgerStorage(
+ conf,
+ ledgerManager,
+ ledgerDirsManager,
+ indexDirsManager,
+ new TrackingEntryLogger(entryLogger),
+ statsLogger,
+ allocator,
+ writeCacheSize,
+ readCacheSize,
+ readAheadCacheBatchSize,
+ readAheadCacheBatchBytesSize);
+ }
+ }
+
+ protected static class TrackingEntryLogger implements EntryLogger {
+ private final EntryLogger delegate;
+
+ TrackingEntryLogger(EntryLogger delegate) {
+ this.delegate = delegate;
+ }
+
+ EntryLogger getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public long addEntry(long ledgerId, ByteBuf buf) throws IOException {
+ return delegate.addEntry(ledgerId, buf);
+ }
+
+ @Override
+ public ByteBuf readEntry(long entryLocation) throws IOException, NoEntryException {
+ return delegate.readEntry(entryLocation);
+ }
+
+ @Override
+ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) throws IOException, NoEntryException {
+ TrackingDbLedgerStorage.READ_ENTRY_CALLS.incrementAndGet();
+ return delegate.readEntry(ledgerId, entryId, entryLocation);
+ }
+
+ @Override
+ public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
+ throws IOException, NoEntryException {
+ TrackingDbLedgerStorage.READ_ENTRY_IF_FITS_CALLS.incrementAndGet();
+ return delegate.readEntryIfFits(ledgerId, entryId, entryLocation, maxEntrySize);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public CompactionEntryLog newCompactionLog(long logToCompact) throws IOException {
+ return delegate.newCompactionLog(logToCompact);
+ }
+
+ @Override
+ public Collection incompleteCompactionLogs() {
+ return delegate.incompleteCompactionLogs();
+ }
+
+ @Override
+ public Collection getFlushedLogIds() {
+ return delegate.getFlushedLogIds();
+ }
+
+ @Override
+ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
+ delegate.scanEntryLog(entryLogId, scanner);
+ }
+
+ @Override
+ public org.apache.bookkeeper.bookie.EntryLogMetadata getEntryLogMetadata(
+ long entryLogId,
+ org.apache.bookkeeper.bookie.AbstractLogCompactor.Throttler throttler) throws IOException {
+ return delegate.getEntryLogMetadata(entryLogId, throttler);
+ }
+
+ @Override
+ public boolean logExists(long logId) {
+ return delegate.logExists(logId);
+ }
+
+ @Override
+ public boolean removeEntryLog(long entryLogId) {
+ return delegate.removeEntryLog(entryLogId);
+ }
+ }
+
+ protected Class extends DbLedgerStorage> getLedgerStorageClass() {
+ return TrackingDbLedgerStorage.class;
+ }
+
+ protected EntryLogger unwrapTrackingEntryLogger(EntryLogger entryLogger) {
+ if (entryLogger instanceof TrackingEntryLogger) {
+ return ((TrackingEntryLogger) entryLogger).getDelegate();
+ }
+ return entryLogger;
+ }
+
@Before
public void setup() throws Exception {
tmpDir = File.createTempFile("bkTest", ".dir");
@@ -81,15 +214,17 @@ public void setup() throws Exception {
int gcWaitTime = 1000;
conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
- conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerStorageClass(getLedgerStorageClass().getName());
conf.setLedgerDirNames(new String[] { tmpDir.toString() });
BookieImpl bookie = new TestBookieImpl(conf);
ledgerDirsManager = bookie.getLedgerDirsManager();
storage = (DbLedgerStorage) bookie.getLedgerStorage();
+ TrackingDbLedgerStorage.resetReadTracking();
storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> {
- assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DefaultEntryLogger);
+ assertTrue(unwrapTrackingEntryLogger(singleDirectoryDbLedgerStorage.getEntryLogger())
+ instanceof DefaultEntryLogger);
});
}
@@ -255,6 +390,74 @@ public void testBookieCompaction() throws Exception {
assertEquals(newEntry3, res);
}
+ @Test
+ public void testGetEntryIfFitsReturnsNullFromWriteCache() throws Exception {
+ storage.setMasterKey(4L, "key".getBytes());
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(4L);
+ entry.writeLong(10L);
+ entry.writeLong(9L);
+ entry.writeBytes("cache-entry".getBytes());
+ storage.addEntry(entry.retainedDuplicate());
+
+ ByteBuf result = storage.getEntryIfFits(4L, 10L, entry.readableBytes() + Integer.BYTES - 1L);
+ try {
+ assertNull(result);
+ } finally {
+ ReferenceCountUtil.release(result);
+ ReferenceCountUtil.release(entry);
+ }
+ }
+
+ @Test
+ public void testGetEntryIfFitsUsesBoundedReadFromEntryLog() throws Exception {
+ storage.setMasterKey(4L, "key".getBytes());
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(4L);
+ entry.writeLong(12L);
+ entry.writeLong(11L);
+ entry.writeBytes("entry-log-entry".getBytes());
+ storage.addEntry(entry.retainedDuplicate());
+ storage.flush();
+
+ TrackingDbLedgerStorage.resetReadTracking();
+ ByteBuf result = storage.getEntryIfFits(4L, 12L, entry.readableBytes() + Integer.BYTES - 1L);
+ try {
+ assertNull(result);
+ assertEquals(0, TrackingDbLedgerStorage.READ_ENTRY_CALLS.get());
+ assertEquals(1, TrackingDbLedgerStorage.READ_ENTRY_IF_FITS_CALLS.get());
+ } finally {
+ ReferenceCountUtil.release(result);
+ ReferenceCountUtil.release(entry);
+ }
+ }
+
+ @Test
+ public void testGetEntryIfFitsReturnsNullFromReadCache() throws Exception {
+ storage.setMasterKey(4L, "key".getBytes());
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(4L);
+ entry.writeLong(11L);
+ entry.writeLong(10L);
+ entry.writeBytes("read-cache-entry".getBytes());
+ storage.addEntry(entry.retainedDuplicate());
+ storage.flush();
+
+ ByteBuf warm = storage.getEntry(4L, 11L);
+ warm.release();
+
+ TrackingDbLedgerStorage.resetReadTracking();
+ ByteBuf result = storage.getEntryIfFits(4L, 11L, entry.readableBytes() + Integer.BYTES - 1L);
+ try {
+ assertNull(result);
+ assertEquals(0, TrackingDbLedgerStorage.READ_ENTRY_CALLS.get());
+ assertEquals(0, TrackingDbLedgerStorage.READ_ENTRY_IF_FITS_CALLS.get());
+ } finally {
+ ReferenceCountUtil.release(result);
+ ReferenceCountUtil.release(entry);
+ }
+ }
+
@Test
public void doubleDirectory() throws Exception {
int gcWaitTime = 1000;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java
index 76192cf2b40..554a83188f8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java
@@ -46,16 +46,18 @@ public void setup() throws Exception {
int gcWaitTime = 1000;
conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
- conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerStorageClass(getLedgerStorageClass().getName());
conf.setLedgerDirNames(new String[] { tmpDir.toString() });
conf.setProperty("dbStorage_directIOEntryLogger", true);
BookieImpl bookie = new TestBookieImpl(conf);
ledgerDirsManager = bookie.getLedgerDirsManager();
storage = (DbLedgerStorage) bookie.getLedgerStorage();
+ TrackingDbLedgerStorage.resetReadTracking();
storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> {
- assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DirectEntryLogger);
+ assertTrue(unwrapTrackingEntryLogger(singleDirectoryDbLedgerStorage.getEntryLogger())
+ instanceof DirectEntryLogger);
});
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 93ace7eacb1..75c9565f260 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -816,6 +816,16 @@ public void testBatchReadWithV2Protocol() throws Exception {
entries++;
}
assertEquals(expectEntriesNum, entries);
+
+ // The first entry is still returned even when maxSize is smaller than a single entry frame.
+ entries = 0;
+ for (Enumeration readEntries = lh.batchReadEntries(0, 20, headerSize);
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ entries++;
+ }
+ assertEquals(1, entries);
}
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
index 3f897558384..4e9db2c6318 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
@@ -19,11 +19,15 @@
package org.apache.bookkeeper.proto;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -46,6 +50,7 @@
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import org.junit.Before;
import org.junit.Test;
@@ -221,4 +226,171 @@ public void testNonFenceRequest() throws Exception {
assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
assertEquals(BookieProtocol.EOK, response.getErrorCode());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testReadDataReturnsFirstEntryWhenSecondWouldOverflowMaxSize() throws Exception {
+ long ledgerId = 1234L;
+ long firstEntryId = 1L;
+ int firstEntrySize = 20;
+ long maxSize = 70;
+ long expectedRemainingBudget = 10;
+
+ ByteBuf firstEntry = entryBuffer(firstEntrySize);
+ when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
+ when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(expectedRemainingBudget))).thenReturn(null);
+
+ BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize);
+ ByteBufList data = (ByteBufList) processor.readData();
+ assertNotNull(data);
+ try {
+ assertEquals(1, data.size());
+ } finally {
+ data.release();
+ }
+
+ verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId));
+ verify(bookie, times(1)).readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(expectedRemainingBudget));
+ verify(bookie, times(1)).readEntry(anyLong(), anyLong());
+ }
+
+ @Test
+ public void testReadDataReturnsFirstEntryEvenIfItAloneExceedsMaxSize() throws Exception {
+ long ledgerId = 1235L;
+ long firstEntryId = 1L;
+ int firstEntrySize = 20;
+ long maxSize = 50;
+
+ ByteBuf firstEntry = entryBuffer(firstEntrySize);
+ when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
+
+ BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize);
+ ByteBufList data = (ByteBufList) processor.readData();
+ assertNotNull(data);
+ try {
+ assertEquals(1, data.size());
+ } finally {
+ data.release();
+ }
+
+ verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId));
+ verify(bookie, never()).readEntryIfFits(anyLong(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void testReadDataIncludesSecondEntryWhenRemainingBudgetExactlyFitsEntryAndDelimiter() throws Exception {
+ long ledgerId = 1236L;
+ long firstEntryId = 1L;
+ int firstEntrySize = 20;
+ int secondEntrySize = 12;
+ long exactRemainingBudget = secondEntrySize + 4L;
+ long maxSize = 24 + 8 + 4 + firstEntrySize + 4 + exactRemainingBudget;
+
+ ByteBuf firstEntry = entryBuffer(firstEntrySize);
+ ByteBuf secondEntry = entryBuffer(secondEntrySize);
+ when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
+ when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(exactRemainingBudget)))
+ .thenReturn(secondEntry);
+
+ BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize);
+ ByteBufList data = (ByteBufList) processor.readData();
+ assertNotNull(data);
+ try {
+ assertEquals(2, data.size());
+ } finally {
+ data.release();
+ }
+
+ verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId));
+ verify(bookie, times(1)).readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(exactRemainingBudget));
+ verify(bookie, times(1)).readEntry(anyLong(), anyLong());
+ }
+
+ @Test
+ public void testReadDataStopsOnMissingSubsequentEntry() throws Exception {
+ long ledgerId = 1237L;
+ long firstEntryId = 1L;
+ int firstEntrySize = 20;
+
+ ByteBuf firstEntry = entryBuffer(firstEntrySize);
+ when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
+ when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), anyLong()))
+ .thenThrow(new Bookie.NoEntryException(ledgerId, firstEntryId + 1));
+
+ BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024);
+ ByteBufList data = (ByteBufList) processor.readData();
+ assertNotNull(data);
+ try {
+ assertEquals(1, data.size());
+ } finally {
+ data.release();
+ }
+ }
+
+ @Test
+ public void testReadDataPropagatesIOExceptionAfterFirstEntryAndReleasesAccumulatedData() throws Exception {
+ long ledgerId = 1238L;
+ long firstEntryId = 1L;
+
+ ByteBuf firstEntry = entryBuffer(20);
+ when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
+ when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), anyLong()))
+ .thenThrow(new IOException("disk error"));
+
+ BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024);
+ try {
+ processor.readData();
+ fail("Should propagate the storage failure");
+ } catch (IOException expected) {
+ assertEquals(0, firstEntry.refCnt());
+ }
+ }
+
+ @Test
+ public void testProcessPacketReturnsIoErrorWhenSubsequentBoundedReadFails() throws Exception {
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ promise.setSuccess();
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class));
+
+ long ledgerId = 1239L;
+ long firstEntryId = 1L;
+ ByteBuf firstEntry = entryBuffer(20);
+ when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry);
+ when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), anyLong()))
+ .thenThrow(new IOException("disk error"));
+
+ BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024);
+ processor.run();
+
+ latch.await();
+ assertTrue(writtenObject.get() instanceof Response);
+ BookieProtocol.BatchedReadResponse response = (BookieProtocol.BatchedReadResponse) writtenObject.get();
+ try {
+ assertEquals(BookieProtocol.EIO, response.getErrorCode());
+ assertEquals(0, response.getData().size());
+ assertEquals(0, firstEntry.refCnt());
+ } finally {
+ response.release();
+ }
+ }
+
+ private BatchedReadEntryProcessor createProcessor(long ledgerId, long entryId, int maxCount, long maxSize) {
+ ExecutorService service = mock(ExecutorService.class);
+ BookieProtocol.BatchedReadRequest request = BookieProtocol.BatchedReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, BookieProtocol.FLAG_NONE, new byte[] {},
+ 0L, maxCount, maxSize);
+ return BatchedReadEntryProcessor.create(request, requestHandler, requestProcessor, service, true,
+ 5 * 1024 * 1024);
+ }
+
+ private static ByteBuf entryBuffer(int size) {
+ ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size);
+ entry.writeZero(size);
+ return entry;
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
index ac338b9757d..d70adfb83f0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
@@ -142,17 +142,28 @@ public ByteBufList batchReadEntries(BookieId bookieId, int flags, long ledgerId,
}
long frameSize = 24 + 8 + 4;
for (long i = startEntryId; i < startEntryId + maxCount; i++) {
- ByteBuf entry = ledger.getEntry(i);
- frameSize += entry.readableBytes() + 4;
- if (data == null) {
- data = ByteBufList.get(entry);
- } else {
- if (frameSize > maxSize) {
- entry.release();
- break;
- }
- data.add(entry);
- }
+ ByteBuf entry = ledger.getEntry(i);
+ if (data == null) {
+ if (entry == null) {
+ LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i);
+ throw new BKException.BKNoSuchEntryException();
+ }
+ frameSize += entry.readableBytes() + 4;
+ data = ByteBufList.get(entry);
+ continue;
+ }
+
+ if (entry == null) {
+ LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i);
+ break;
+ }
+
+ if (frameSize + entry.readableBytes() + 4 > maxSize) {
+ break;
+ }
+
+ frameSize += entry.readableBytes() + 4;
+ data.add(entry);
}
return data;
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java
new file mode 100644
index 00000000000..4c0421f8f13
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.Test;
+
+public class MockBookiesTest {
+
+ private static final BookieId BOOKIE_ID = BookieId.parse("127.0.0.1:3181");
+ private static final long LEDGER_ID = 1L;
+ private static final long BATCH_RESPONSE_HEADER_SIZE = 24 + 8 + 4;
+
+ @Test
+ public void testBatchReadStopsOnMissingSubsequentEntry() throws Exception {
+ MockBookies mockBookies = new MockBookies();
+ mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8));
+
+ ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, Long.MAX_VALUE);
+
+ assertNotNull(data);
+ assertEquals(1, data.size());
+ assertEquals(8, data.getBuffer(0).readableBytes());
+ }
+
+ @Test
+ public void testBatchReadDoesNotReleaseOversizedSkippedEntry() throws Exception {
+ MockBookies mockBookies = new MockBookies();
+ mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8));
+ mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 1L, newEntry(16));
+
+ long maxSize = BATCH_RESPONSE_HEADER_SIZE + 8 + Integer.BYTES + 16 + Integer.BYTES - 1;
+ ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, maxSize);
+
+ assertNotNull(data);
+ assertEquals(1, data.size());
+ assertEquals(8, data.getBuffer(0).readableBytes());
+ assertEquals(16, mockBookies.readEntry(BOOKIE_ID, 0, LEDGER_ID, 1L).readableBytes());
+ }
+
+ private static ByteBuf newEntry(int size) {
+ return Unpooled.buffer(size).writeZero(size);
+ }
+}