diff --git a/.gitignore b/.gitignore index 42309e4d62b..e83db003eed 100644 --- a/.gitignore +++ b/.gitignore @@ -50,4 +50,4 @@ build/ *.dat # Downloaded released BookKeeper versions (cached by CI, not committed) -.released-versions/ \ No newline at end of file +.released-versions/ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 90c8acf5af4..b8613fb76be 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -57,6 +57,16 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte // TODO: Shouldn't this be async? ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException, BookieException; + + /** + * Read a ledger entry only when it can fit the provided bound. + * + *

{@code maxEntrySize} includes the 4-byte per-entry delimiter used by batched-read response framing. + * Implementations return {@code null} when the entry exists but {@code entry.readableBytes() + 4} + * exceeds {@code maxEntrySize}. + */ + ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize) + throws IOException, NoLedgerException, BookieException; long readLastAddConfirmed(long ledgerId) throws IOException, BookieException; PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException; @@ -127,4 +137,4 @@ public long getEntry() { } } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 111f5134e78..359f92c0425 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1166,6 +1166,36 @@ public ByteBuf readEntry(long ledgerId, long entryId) } } + @Override + public ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize) + throws IOException, NoLedgerException, BookieException { + long requestNanos = MathUtils.nowInNano(); + boolean success = false; + int entrySize = 0; + try { + LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); + if (LOG.isTraceEnabled()) { + LOG.trace("Reading {}@{} with maxEntrySize {}", entryId, ledgerId, maxEntrySize); + } + ByteBuf entry = handle.readEntryIfFits(entryId, maxEntrySize); + if (entry != null) { + entrySize = entry.readableBytes(); + bookieStats.getReadBytes().addCount(entrySize); + } + success = true; + return entry; + } finally { + long elapsedNanos = MathUtils.elapsedNanos(requestNanos); + if (success) { + bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS); + bookieStats.getReadBytesStats().registerSuccessfulValue(entrySize); + } else { + bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS); + bookieStats.getReadBytesStats().registerFailedValue(entrySize); + } + } + } + public long readLastAddConfirmed(long ledgerId) throws IOException, BookieException { LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); return handle.getLastAddConfirmed(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 2696aee3c94..e26c3660bfb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -836,6 +836,40 @@ public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryExcept return internalReadEntry(-1L, -1L, location, false /* validateEntry */); } + @Override + public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize) + throws IOException, Bookie.NoEntryException { + long entryLogId = logIdForOffset(entryLocation); + long pos = posForOffset(entryLocation); + + BufferedReadChannel fc = null; + int entrySize; + try { + fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos); + + ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc); + entrySize = sizeBuff.getInt(0); + if (entrySize + Integer.BYTES > maxEntrySize) { + return null; + } + validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff); + } catch (EntryLookupException e) { + throw new IOException("Bad entry read from log file id: " + entryLogId, e); + } + + ByteBuf data = allocator.buffer(entrySize, entrySize); + int rc = readFromLogChannel(entryLogId, fc, data, pos); + if (rc != entrySize) { + ReferenceCountUtil.release(data); + throw new IOException("Bad entry read from log file id: " + entryLogId, + new EntryLookupException("Short read for " + ledgerId + "@" + + entryId + " in " + entryLogId + "@" + + pos + "(" + rc + "!=" + entrySize + ")")); + } + data.writerIndex(entrySize); + return data; + } + private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry) throws IOException, Bookie.NoEntryException { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 4c6b7a9ee4d..550c2393544 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -446,6 +446,44 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { } } + @Override + public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException { + if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { + entryId = ledgerCache.getLastEntry(ledgerId); + } + + long offset; + long startTimeNanos = MathUtils.nowInNano(); + boolean success = false; + try { + offset = ledgerCache.getEntryOffset(ledgerId, entryId); + if (offset == 0) { + throw new Bookie.NoEntryException(ledgerId, entryId); + } + success = true; + } finally { + if (success) { + getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } else { + getOffsetStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + } + + startTimeNanos = MathUtils.nowInNano(); + success = false; + try { + ByteBuf entry = entryLogger.readEntryIfFits(ledgerId, entryId, offset, maxEntrySize); + success = true; + return entry; + } finally { + if (success) { + getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } else { + getEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + } + } + private void flushOrCheckpoint(boolean isCheckpointFlush) throws IOException { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java index 3c84feb876d..cc118c2d98e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java @@ -78,6 +78,21 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) { abstract long addEntry(ByteBuf entry) throws IOException, BookieException; abstract ByteBuf readEntry(long entryId) throws IOException, BookieException; + /** + * Read an entry only when it fits within {@code maxEntrySize}. + * + *

{@code maxEntrySize} includes the 4-byte per-entry delimiter used in batched-read response framing, + * so an exact fit is {@code entry.readableBytes() + 4 == maxEntrySize}. + */ + ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException { + ByteBuf entry = readEntry(entryId); + if (entry.readableBytes() + 4 > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + abstract long getLastAddConfirmed() throws IOException, BookieException; abstract boolean waitForLastAddConfirmedUpdate(long previousLAC, Watcher watcher) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index 00edfb6a9c2..fe2d483566d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -161,6 +161,11 @@ ByteBuf readEntry(long entryId) throws IOException, BookieException { return ledgerStorage.getEntry(ledgerId, entryId); } + @Override + ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException { + return ledgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize); + } + @Override long getLastAddConfirmed() throws IOException, BookieException { return ledgerStorage.getLastAddConfirmed(ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 6eca6e00108..ff18638a63e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -144,6 +144,21 @@ void initialize(ServerConfiguration conf, */ ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException; + /** + * Read an entry from storage only if its serialized size, including the + * 4-byte per-entry framing delimiter, is less than or equal to maxEntrySize. + * + *

Returns {@code null} when the entry exists but does not fit the supplied budget. + */ + default ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException { + ByteBuf entry = getEntry(ledgerId, entryId); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + /** * Get last add confirmed. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 689668eb938..6b7fdd404e8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -239,6 +239,38 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE return buffToRet; } + @Override + public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException { + if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { + EntryKeyValue kv = memTable.getLastEntry(ledgerId); + if (kv != null) { + ByteBuf entry = kv.getValueAsByteBuffer(); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize); + } + + try { + return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize); + } catch (Bookie.NoEntryException nee) { + EntryKeyValue kv = memTable.getEntry(ledgerId, entryId); + if (kv == null) { + return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize); + } + + ByteBuf entry = kv.getValueAsByteBuffer(); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + } + @Override public long getLastAddConfirmed(long ledgerId) throws IOException { return interleavedLedgerStorage.getLastAddConfirmed(ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java index c8d127c96ba..b71ab17ff50 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java @@ -67,6 +67,22 @@ ByteBuf readEntry(long entryLocation) ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) throws IOException, NoEntryException; + /** + * Read an entry only if its serialized size, including the 4-byte per-entry + * framing delimiter, is less than or equal to maxEntrySize. + * + *

Returns {@code null} when the entry exists but does not fit the supplied budget. + */ + default ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize) + throws IOException, NoEntryException { + ByteBuf entry = readEntry(ledgerId, entryId, entryLocation); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + /** * Flush any outstanding writes to disk. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 035981514e9..a4dcd08b8ae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -55,6 +55,7 @@ import org.apache.bookkeeper.bookie.storage.EntryLogIds; import org.apache.bookkeeper.bookie.storage.EntryLogScanner; import org.apache.bookkeeper.bookie.storage.EntryLogger; +import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.common.util.nativeio.NativeIO; import org.apache.bookkeeper.slogger.Slogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -211,6 +212,41 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) return internalReadEntry(ledgerId, entryId, entryLocation, true); } + @Override + public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize) + throws IOException, NoEntryException { + int logId = (int) (entryLocation >> 32); + int pos = (int) (entryLocation & 0xFFFFFFFF); + + long start = System.nanoTime(); + LogReader reader = getReader(logId); + + try { + int entrySize = reader.readEntrySizeAt(pos); + if (entrySize + Integer.BYTES > maxEntrySize) { + stats.getReadEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS); + return null; + } + long thisLedgerId = reader.readLongAt(pos); + long thisEntryId = reader.readLongAt(pos + Long.BYTES); + if (thisLedgerId != ledgerId || thisEntryId != entryId) { + throw new IOException( + exMsg("Bad location").kv("location", entryLocation) + .kv("expectedLedger", ledgerId).kv("expectedEntry", entryId) + .kv("foundLedger", thisLedgerId).kv("foundEntry", thisEntryId) + .toString()); + } + ByteBuf buf = reader.readBufferAt(pos, entrySize); + stats.getReadEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS); + return buf; + } catch (EOFException eof) { + stats.getReadEntryStats().registerFailedEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS); + throw new NoEntryException( + exMsg("Entry location doesn't exist").kv("location", entryLocation).toString(), + ledgerId, entryId); + } + } + private LogReader getReader(int logId) throws IOException { Cache cache = caches.get(); try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java index 707bf307c05..4ccd6aa9ee3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java @@ -162,7 +162,7 @@ private int readBytesIntoBuf(ByteBuf buf, long offset, int size) throws IOExcept } @Override - public ByteBuf readEntryAt(int offset) throws IOException, EOFException { + public int readEntrySizeAt(int offset) throws IOException, EOFException { assertValidEntryOffset(offset); int sizeOffset = offset - Integer.BYTES; if (sizeOffset < 0) { @@ -188,6 +188,12 @@ public ByteBuf readEntryAt(int offset) throws IOException, EOFException { .kv("maxSaneEntrySize", maxSaneEntrySize) .kv("readEntrySize", entrySize).toString()); } + return entrySize; + } + + @Override + public ByteBuf readEntryAt(int offset) throws IOException, EOFException { + int entrySize = readEntrySizeAt(offset); return readBufferAt(offset, entrySize); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java index 9f865d55699..e37c1606065 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReader.java @@ -63,6 +63,12 @@ public interface LogReader extends AutoCloseable { */ long readLongAt(long offset) throws IOException, EOFException; + /** + * Read the size of an entry at a given offset. + * The size is stored at {@code offset - Integer.BYTES}. + */ + int readEntrySizeAt(int offset) throws IOException, EOFException; + /** * Read an entry at a given offset. * The size of the entry must be at (offset - Integer.BYTES). diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 69964c8f81f..c163f0aa9b2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -426,6 +426,11 @@ public long getLocation(long ledgerId, long entryId) throws IOException { return getLedgerStorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, entryId); } + @Override + public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException { + return getLedgerStorage(ledgerId).getEntryIfFits(ledgerId, entryId, maxEntrySize); + } + private SingleDirectoryDbLedgerStorage getLedgerStorage(long ledgerId) { return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs)); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 774d10c158f..1977cf828a5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -581,6 +581,18 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE } } + public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException { + long startTime = MathUtils.nowInNano(); + try { + ByteBuf entry = doGetEntryIfFits(ledgerId, entryId, maxEntrySize); + recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); + return entry; + } catch (IOException e) { + recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); + throw e; + } + } + private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, BookieException { if (log.isDebugEnabled()) { log.debug("Get Entry: {}@{}", ledgerId, entryId); @@ -665,6 +677,100 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book return entry; } + private ByteBuf doGetEntryIfFits(long ledgerId, long entryId, long maxEntrySize) + throws IOException, BookieException { + if (log.isDebugEnabled()) { + log.debug("Get Entry If Fits: {}@{} maxEntrySize={}", ledgerId, entryId, maxEntrySize); + } + + if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { + ByteBuf entry = getLastEntry(ledgerId); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + + long stamp = writeCacheRotationLock.tryOptimisticRead(); + WriteCache localWriteCache = writeCache; + WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed; + if (!writeCacheRotationLock.validate(stamp)) { + stamp = writeCacheRotationLock.readLock(); + try { + localWriteCache = writeCache; + localWriteCacheBeingFlushed = writeCacheBeingFlushed; + } finally { + writeCacheRotationLock.unlockRead(stamp); + } + } + + ByteBuf entry = localWriteCache.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getWriteCacheHitCounter().inc(); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + + entry = localWriteCacheBeingFlushed.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getWriteCacheHitCounter().inc(); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + + dbLedgerStorageStats.getWriteCacheMissCounter().inc(); + + entry = readCache.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getReadCacheHitCounter().inc(); + if (entry.readableBytes() + Integer.BYTES > maxEntrySize) { + entry.release(); + return null; + } + return entry; + } + + dbLedgerStorageStats.getReadCacheMissCounter().inc(); + + long entryLocation; + long locationIndexStartNano = MathUtils.nowInNano(); + try { + entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); + if (entryLocation == 0) { + // Only a negative result while in limbo equates to unknown + throwIfLimbo(ledgerId); + + throw new NoEntryException(ledgerId, entryId); + } + } finally { + dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( + MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); + } + + long readEntryStartNano = MathUtils.nowInNano(); + try { + entry = entryLogger.readEntryIfFits(ledgerId, entryId, entryLocation, maxEntrySize); + } finally { + dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( + MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); + } + + if (entry == null) { + return null; + } + + readCache.put(ledgerId, entryId, entry); + + return entry; + } + private void fillReadAheadCache(long originalLedgerId, long firstEntryId, long firstEntryLocation) { long readAheadStartNano = MathUtils.nowInNano(); int count = 0; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java index 6db3e143519..c4a7072bf82 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java @@ -24,6 +24,7 @@ import io.netty.util.Recycler; import io.netty.util.ReferenceCounted; import java.util.concurrent.ExecutorService; +import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.proto.BookieProtocol.BatchedReadRequest; import org.apache.bookkeeper.util.ByteBufList; @@ -59,22 +60,33 @@ protected ReferenceCounted readData() throws Exception { long frameSize = 24 + 8 + 4; for (int i = 0; i < maxCount; i++) { try { - ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i); - frameSize += entry.readableBytes() + 4; if (data == null) { + ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId()); + frameSize += entry.readableBytes() + 4; data = ByteBufList.get(entry); } else { - if (frameSize > maxSize) { - entry.release(); + long remainingEntrySize = maxSize - frameSize; + if (remainingEntrySize <= 0) { break; } + ByteBuf entry = requestProcessor.getBookie().readEntryIfFits( + request.getLedgerId(), request.getEntryId() + i, remainingEntrySize); + if (entry == null) { + break; + } + frameSize += entry.readableBytes() + 4; data.add(entry); } - } catch (Throwable e) { + } catch (Bookie.NoEntryException e) { if (data == null) { throw e; } break; + } catch (Throwable e) { + if (data != null) { + data.release(); + } + throw e; } } return data; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index 3f14a33d53f..dc459b53b02 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -391,6 +391,26 @@ public void testRecoverFromLedgersMap() throws Exception { assertEquals(120, meta.getRemainingSize()); } + @Test + public void testReadEntryIfFitsHonorsFramingBudget() throws Exception { + long ledgerId = 1L; + long entryId = 1L; + ByteBuf entry = makeEntry(ledgerId, entryId, 128); + long location = entryLogger.addEntry(ledgerId, entry.slice()); + entryLogger.flush(); + + assertNull(entryLogger.readEntryIfFits(ledgerId, entryId, location, entry.readableBytes() + 3)); + + ByteBuf actual = entryLogger.readEntryIfFits(ledgerId, entryId, location, entry.readableBytes() + 4); + try { + assertNotNull(actual); + assertEntryEquals(actual, entry); + } finally { + ReferenceCountUtil.release(actual); + ReferenceCountUtil.release(entry); + } + } + @Test public void testLedgersMapIsEmpty() throws Exception { // create some entries diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java index 9c75f9c3ad7..fc49b60d133 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java @@ -24,6 +24,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; @@ -31,6 +33,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -46,6 +49,7 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.test.TestStatsProvider; @@ -110,6 +114,8 @@ void accept(long ledgerId, long pos); } volatile CheckEntryListener testPoint; + volatile int readEntryCalls; + volatile int readEntryIfFitsCalls; public TestableDefaultEntryLogger( ServerConfiguration conf, @@ -123,6 +129,11 @@ void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedExce this.testPoint = testPoint; } + void resetReadTracking() { + readEntryCalls = 0; + readEntryIfFitsCalls = 0; + } + @Override void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException { CheckEntryListener runBefore = testPoint; @@ -131,6 +142,20 @@ void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupEx } super.checkEntry(ledgerId, entryId, location); } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) + throws IOException, Bookie.NoEntryException { + readEntryCalls++; + return super.readEntry(ledgerId, entryId, entryLocation); + } + + @Override + public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize) + throws IOException, Bookie.NoEntryException { + readEntryIfFitsCalls++; + return super.readEntryIfFits(ledgerId, entryId, entryLocation, maxEntrySize); + } } TestStatsProvider statsProvider = new TestStatsProvider(); @@ -244,6 +269,66 @@ public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException { } } + @Test + public void testGetEntryIfFitsUsesBoundedLoggerRead() throws Exception { + long ledgerId = 0L; + long entryId = 0L; + long exactFitSize = Long.BYTES * 2 + "entry-0".getBytes(StandardCharsets.UTF_8).length + Integer.BYTES; + + entryLogger.resetReadTracking(); + ByteBuf oversized = interleavedStorage.getEntryIfFits(ledgerId, entryId, exactFitSize - 1); + assertNull(oversized); + assertEquals(0, entryLogger.readEntryCalls); + assertEquals(1, entryLogger.readEntryIfFitsCalls); + + entryLogger.resetReadTracking(); + ByteBuf exactFit = interleavedStorage.getEntryIfFits(ledgerId, entryId, exactFitSize); + try { + assertNotNull(exactFit); + assertEquals(ledgerId, exactFit.getLong(0)); + assertEquals(entryId, exactFit.getLong(Long.BYTES)); + assertEquals("entry-0", exactFit.toString(Long.BYTES * 2, + exactFit.readableBytes() - (Long.BYTES * 2), StandardCharsets.UTF_8)); + assertEquals(0, entryLogger.readEntryCalls); + assertEquals(1, entryLogger.readEntryIfFitsCalls); + } finally { + if (exactFit != null) { + exactFit.release(); + } + } + } + + @Test + public void testGetEntryIfFitsHonorsBudgetForLastAddConfirmed() throws Exception { + long ledgerId = 0L; + ByteBuf lastEntry = interleavedStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); + long exactFitSize = lastEntry.readableBytes() + Integer.BYTES; + long lastEntryId = lastEntry.getLong(Long.BYTES); + lastEntry.release(); + + entryLogger.resetReadTracking(); + ByteBuf oversized = interleavedStorage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, + exactFitSize - 1); + assertNull(oversized); + assertEquals(0, entryLogger.readEntryCalls); + assertEquals(1, entryLogger.readEntryIfFitsCalls); + + entryLogger.resetReadTracking(); + ByteBuf exactFit = interleavedStorage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, + exactFitSize); + try { + assertNotNull(exactFit); + assertEquals(ledgerId, exactFit.getLong(0)); + assertEquals(lastEntryId, exactFit.getLong(Long.BYTES)); + assertEquals(0, entryLogger.readEntryCalls); + assertEquals(1, entryLogger.readEntryIfFitsCalls); + } finally { + if (exactFit != null) { + exactFit.release(); + } + } + } + @Test public void testConsistencyCheckConcurrentGC() throws Exception { final long signalDone = -1; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java index db83f096d95..3885451e99a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java @@ -23,6 +23,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; @@ -38,12 +40,14 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.util.DiskChecker; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; /** * Testing SortedLedgerStorage. @@ -191,4 +195,47 @@ public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException { })); } } + + @Test + public void testGetEntryIfFitsHonorsBudgetForLastAddConfirmed() throws Exception { + long ledgerId = 0L; + long lastEntryId = 1234L; + ByteBuf entry = Unpooled.buffer(128); + entry.writeLong(ledgerId); + entry.writeLong(lastEntryId); + entry.writeBytes("lac-entry".getBytes()); + + byte[] bytes = new byte[entry.readableBytes()]; + entry.getBytes(entry.readerIndex(), bytes); + + InterleavedLedgerStorage interleaved = Mockito.mock(InterleavedLedgerStorage.class); + SortedLedgerStorage storage = new SortedLedgerStorage(interleaved); + storage.memTable = Mockito.mock(EntryMemTable.class); + Mockito.when(storage.memTable.getLastEntry(ledgerId)) + .thenReturn(new EntryKeyValue(ledgerId, lastEntryId, bytes)); + + long exactFitSize = entry.readableBytes() + Integer.BYTES; + try { + ByteBuf oversized = storage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, + exactFitSize - 1); + assertNull(oversized); + + ByteBuf exactFit = storage.getEntryIfFits(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, + exactFitSize); + try { + assertNotNull(exactFit); + assertEquals(ledgerId, exactFit.getLong(0)); + assertEquals(lastEntryId, exactFit.getLong(Long.BYTES)); + Mockito.verifyNoInteractions(interleaved); + } finally { + if (exactFit != null) { + exactFit.release(); + } + } + } finally { + if (entry != null) { + entry.release(); + } + } + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index dfc2459678b..7b17c9a2c08 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -35,8 +36,10 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.BookieException; @@ -49,6 +52,8 @@ import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.LogMark; import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; @@ -70,6 +75,134 @@ public class DbLedgerStorageTest { protected LedgerDirsManager ledgerDirsManager; protected ServerConfiguration conf; + protected static class TrackingDbLedgerStorage extends DbLedgerStorage { + static final AtomicInteger READ_ENTRY_CALLS = new AtomicInteger(); + static final AtomicInteger READ_ENTRY_IF_FITS_CALLS = new AtomicInteger(); + + static void resetReadTracking() { + READ_ENTRY_CALLS.set(0); + READ_ENTRY_IF_FITS_CALLS.set(0); + } + + @Override + protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, + org.apache.bookkeeper.meta.LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, + LedgerDirsManager indexDirsManager, + EntryLogger entryLogger, + org.apache.bookkeeper.stats.StatsLogger statsLogger, + long writeCacheSize, + long readCacheSize, + int readAheadCacheBatchSize, + long readAheadCacheBatchBytesSize) throws IOException { + return new SingleDirectoryDbLedgerStorage( + conf, + ledgerManager, + ledgerDirsManager, + indexDirsManager, + new TrackingEntryLogger(entryLogger), + statsLogger, + allocator, + writeCacheSize, + readCacheSize, + readAheadCacheBatchSize, + readAheadCacheBatchBytesSize); + } + } + + protected static class TrackingEntryLogger implements EntryLogger { + private final EntryLogger delegate; + + TrackingEntryLogger(EntryLogger delegate) { + this.delegate = delegate; + } + + EntryLogger getDelegate() { + return delegate; + } + + @Override + public long addEntry(long ledgerId, ByteBuf buf) throws IOException { + return delegate.addEntry(ledgerId, buf); + } + + @Override + public ByteBuf readEntry(long entryLocation) throws IOException, NoEntryException { + return delegate.readEntry(entryLocation); + } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) throws IOException, NoEntryException { + TrackingDbLedgerStorage.READ_ENTRY_CALLS.incrementAndGet(); + return delegate.readEntry(ledgerId, entryId, entryLocation); + } + + @Override + public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize) + throws IOException, NoEntryException { + TrackingDbLedgerStorage.READ_ENTRY_IF_FITS_CALLS.incrementAndGet(); + return delegate.readEntryIfFits(ledgerId, entryId, entryLocation, maxEntrySize); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public CompactionEntryLog newCompactionLog(long logToCompact) throws IOException { + return delegate.newCompactionLog(logToCompact); + } + + @Override + public Collection incompleteCompactionLogs() { + return delegate.incompleteCompactionLogs(); + } + + @Override + public Collection getFlushedLogIds() { + return delegate.getFlushedLogIds(); + } + + @Override + public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException { + delegate.scanEntryLog(entryLogId, scanner); + } + + @Override + public org.apache.bookkeeper.bookie.EntryLogMetadata getEntryLogMetadata( + long entryLogId, + org.apache.bookkeeper.bookie.AbstractLogCompactor.Throttler throttler) throws IOException { + return delegate.getEntryLogMetadata(entryLogId, throttler); + } + + @Override + public boolean logExists(long logId) { + return delegate.logExists(logId); + } + + @Override + public boolean removeEntryLog(long entryLogId) { + return delegate.removeEntryLog(entryLogId); + } + } + + protected Class getLedgerStorageClass() { + return TrackingDbLedgerStorage.class; + } + + protected EntryLogger unwrapTrackingEntryLogger(EntryLogger entryLogger) { + if (entryLogger instanceof TrackingEntryLogger) { + return ((TrackingEntryLogger) entryLogger).getDelegate(); + } + return entryLogger; + } + @Before public void setup() throws Exception { tmpDir = File.createTempFile("bkTest", ".dir"); @@ -81,15 +214,17 @@ public void setup() throws Exception { int gcWaitTime = 1000; conf = TestBKConfiguration.newServerConfiguration(); conf.setGcWaitTime(gcWaitTime); - conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerStorageClass(getLedgerStorageClass().getName()); conf.setLedgerDirNames(new String[] { tmpDir.toString() }); BookieImpl bookie = new TestBookieImpl(conf); ledgerDirsManager = bookie.getLedgerDirsManager(); storage = (DbLedgerStorage) bookie.getLedgerStorage(); + TrackingDbLedgerStorage.resetReadTracking(); storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { - assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DefaultEntryLogger); + assertTrue(unwrapTrackingEntryLogger(singleDirectoryDbLedgerStorage.getEntryLogger()) + instanceof DefaultEntryLogger); }); } @@ -255,6 +390,74 @@ public void testBookieCompaction() throws Exception { assertEquals(newEntry3, res); } + @Test + public void testGetEntryIfFitsReturnsNullFromWriteCache() throws Exception { + storage.setMasterKey(4L, "key".getBytes()); + ByteBuf entry = Unpooled.buffer(1024); + entry.writeLong(4L); + entry.writeLong(10L); + entry.writeLong(9L); + entry.writeBytes("cache-entry".getBytes()); + storage.addEntry(entry.retainedDuplicate()); + + ByteBuf result = storage.getEntryIfFits(4L, 10L, entry.readableBytes() + Integer.BYTES - 1L); + try { + assertNull(result); + } finally { + ReferenceCountUtil.release(result); + ReferenceCountUtil.release(entry); + } + } + + @Test + public void testGetEntryIfFitsUsesBoundedReadFromEntryLog() throws Exception { + storage.setMasterKey(4L, "key".getBytes()); + ByteBuf entry = Unpooled.buffer(1024); + entry.writeLong(4L); + entry.writeLong(12L); + entry.writeLong(11L); + entry.writeBytes("entry-log-entry".getBytes()); + storage.addEntry(entry.retainedDuplicate()); + storage.flush(); + + TrackingDbLedgerStorage.resetReadTracking(); + ByteBuf result = storage.getEntryIfFits(4L, 12L, entry.readableBytes() + Integer.BYTES - 1L); + try { + assertNull(result); + assertEquals(0, TrackingDbLedgerStorage.READ_ENTRY_CALLS.get()); + assertEquals(1, TrackingDbLedgerStorage.READ_ENTRY_IF_FITS_CALLS.get()); + } finally { + ReferenceCountUtil.release(result); + ReferenceCountUtil.release(entry); + } + } + + @Test + public void testGetEntryIfFitsReturnsNullFromReadCache() throws Exception { + storage.setMasterKey(4L, "key".getBytes()); + ByteBuf entry = Unpooled.buffer(1024); + entry.writeLong(4L); + entry.writeLong(11L); + entry.writeLong(10L); + entry.writeBytes("read-cache-entry".getBytes()); + storage.addEntry(entry.retainedDuplicate()); + storage.flush(); + + ByteBuf warm = storage.getEntry(4L, 11L); + warm.release(); + + TrackingDbLedgerStorage.resetReadTracking(); + ByteBuf result = storage.getEntryIfFits(4L, 11L, entry.readableBytes() + Integer.BYTES - 1L); + try { + assertNull(result); + assertEquals(0, TrackingDbLedgerStorage.READ_ENTRY_CALLS.get()); + assertEquals(0, TrackingDbLedgerStorage.READ_ENTRY_IF_FITS_CALLS.get()); + } finally { + ReferenceCountUtil.release(result); + ReferenceCountUtil.release(entry); + } + } + @Test public void doubleDirectory() throws Exception { int gcWaitTime = 1000; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java index 76192cf2b40..554a83188f8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -46,16 +46,18 @@ public void setup() throws Exception { int gcWaitTime = 1000; conf = TestBKConfiguration.newServerConfiguration(); conf.setGcWaitTime(gcWaitTime); - conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerStorageClass(getLedgerStorageClass().getName()); conf.setLedgerDirNames(new String[] { tmpDir.toString() }); conf.setProperty("dbStorage_directIOEntryLogger", true); BookieImpl bookie = new TestBookieImpl(conf); ledgerDirsManager = bookie.getLedgerDirsManager(); storage = (DbLedgerStorage) bookie.getLedgerStorage(); + TrackingDbLedgerStorage.resetReadTracking(); storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { - assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DirectEntryLogger); + assertTrue(unwrapTrackingEntryLogger(singleDirectoryDbLedgerStorage.getEntryLogger()) + instanceof DirectEntryLogger); }); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 93ace7eacb1..75c9565f260 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -816,6 +816,16 @@ public void testBatchReadWithV2Protocol() throws Exception { entries++; } assertEquals(expectEntriesNum, entries); + + // The first entry is still returned even when maxSize is smaller than a single entry frame. + entries = 0; + for (Enumeration readEntries = lh.batchReadEntries(0, 20, headerSize); + readEntries.hasMoreElements();) { + LedgerEntry entry = readEntries.nextElement(); + assertArrayEquals(data, entry.getEntry()); + entries++; + } + assertEquals(1, entries); } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java index 3f897558384..4e9db2c6318 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java @@ -19,11 +19,15 @@ package org.apache.bookkeeper.proto; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +50,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.proto.BookieProtocol.Response; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.ByteBufList; import org.junit.Before; import org.junit.Test; @@ -221,4 +226,171 @@ public void testNonFenceRequest() throws Exception { assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode()); assertEquals(BookieProtocol.EOK, response.getErrorCode()); } -} \ No newline at end of file + + @Test + public void testReadDataReturnsFirstEntryWhenSecondWouldOverflowMaxSize() throws Exception { + long ledgerId = 1234L; + long firstEntryId = 1L; + int firstEntrySize = 20; + long maxSize = 70; + long expectedRemainingBudget = 10; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(expectedRemainingBudget))).thenReturn(null); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + } finally { + data.release(); + } + + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId)); + verify(bookie, times(1)).readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(expectedRemainingBudget)); + verify(bookie, times(1)).readEntry(anyLong(), anyLong()); + } + + @Test + public void testReadDataReturnsFirstEntryEvenIfItAloneExceedsMaxSize() throws Exception { + long ledgerId = 1235L; + long firstEntryId = 1L; + int firstEntrySize = 20; + long maxSize = 50; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + } finally { + data.release(); + } + + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId)); + verify(bookie, never()).readEntryIfFits(anyLong(), anyLong(), anyLong()); + } + + @Test + public void testReadDataIncludesSecondEntryWhenRemainingBudgetExactlyFitsEntryAndDelimiter() throws Exception { + long ledgerId = 1236L; + long firstEntryId = 1L; + int firstEntrySize = 20; + int secondEntrySize = 12; + long exactRemainingBudget = secondEntrySize + 4L; + long maxSize = 24 + 8 + 4 + firstEntrySize + 4 + exactRemainingBudget; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + ByteBuf secondEntry = entryBuffer(secondEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(exactRemainingBudget))) + .thenReturn(secondEntry); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(2, data.size()); + } finally { + data.release(); + } + + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId)); + verify(bookie, times(1)).readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), eq(exactRemainingBudget)); + verify(bookie, times(1)).readEntry(anyLong(), anyLong()); + } + + @Test + public void testReadDataStopsOnMissingSubsequentEntry() throws Exception { + long ledgerId = 1237L; + long firstEntryId = 1L; + int firstEntrySize = 20; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), anyLong())) + .thenThrow(new Bookie.NoEntryException(ledgerId, firstEntryId + 1)); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + } finally { + data.release(); + } + } + + @Test + public void testReadDataPropagatesIOExceptionAfterFirstEntryAndReleasesAccumulatedData() throws Exception { + long ledgerId = 1238L; + long firstEntryId = 1L; + + ByteBuf firstEntry = entryBuffer(20); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), anyLong())) + .thenThrow(new IOException("disk error")); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024); + try { + processor.readData(); + fail("Should propagate the storage failure"); + } catch (IOException expected) { + assertEquals(0, firstEntry.refCnt()); + } + } + + @Test + public void testProcessPacketReturnsIoErrorWhenSubsequentBoundedReadFails() throws Exception { + ChannelPromise promise = new DefaultChannelPromise(channel); + AtomicReference writtenObject = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + writtenObject.set(invocationOnMock.getArgument(0)); + promise.setSuccess(); + latch.countDown(); + return promise; + }).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = 1239L; + long firstEntryId = 1L; + ByteBuf firstEntry = entryBuffer(20); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntryIfFits(eq(ledgerId), eq(firstEntryId + 1), anyLong())) + .thenThrow(new IOException("disk error")); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024); + processor.run(); + + latch.await(); + assertTrue(writtenObject.get() instanceof Response); + BookieProtocol.BatchedReadResponse response = (BookieProtocol.BatchedReadResponse) writtenObject.get(); + try { + assertEquals(BookieProtocol.EIO, response.getErrorCode()); + assertEquals(0, response.getData().size()); + assertEquals(0, firstEntry.refCnt()); + } finally { + response.release(); + } + } + + private BatchedReadEntryProcessor createProcessor(long ledgerId, long entryId, int maxCount, long maxSize) { + ExecutorService service = mock(ExecutorService.class); + BookieProtocol.BatchedReadRequest request = BookieProtocol.BatchedReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, BookieProtocol.FLAG_NONE, new byte[] {}, + 0L, maxCount, maxSize); + return BatchedReadEntryProcessor.create(request, requestHandler, requestProcessor, service, true, + 5 * 1024 * 1024); + } + + private static ByteBuf entryBuffer(int size) { + ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size); + entry.writeZero(size); + return entry; + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java index ac338b9757d..d70adfb83f0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java @@ -142,17 +142,28 @@ public ByteBufList batchReadEntries(BookieId bookieId, int flags, long ledgerId, } long frameSize = 24 + 8 + 4; for (long i = startEntryId; i < startEntryId + maxCount; i++) { - ByteBuf entry = ledger.getEntry(i); - frameSize += entry.readableBytes() + 4; - if (data == null) { - data = ByteBufList.get(entry); - } else { - if (frameSize > maxSize) { - entry.release(); - break; - } - data.add(entry); - } + ByteBuf entry = ledger.getEntry(i); + if (data == null) { + if (entry == null) { + LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i); + throw new BKException.BKNoSuchEntryException(); + } + frameSize += entry.readableBytes() + 4; + data = ByteBufList.get(entry); + continue; + } + + if (entry == null) { + LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i); + break; + } + + if (frameSize + entry.readableBytes() + 4 > maxSize) { + break; + } + + frameSize += entry.readableBytes() + 4; + data.add(entry); } return data; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java new file mode 100644 index 00000000000..4c0421f8f13 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.util.ByteBufList; +import org.junit.Test; + +public class MockBookiesTest { + + private static final BookieId BOOKIE_ID = BookieId.parse("127.0.0.1:3181"); + private static final long LEDGER_ID = 1L; + private static final long BATCH_RESPONSE_HEADER_SIZE = 24 + 8 + 4; + + @Test + public void testBatchReadStopsOnMissingSubsequentEntry() throws Exception { + MockBookies mockBookies = new MockBookies(); + mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8)); + + ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, Long.MAX_VALUE); + + assertNotNull(data); + assertEquals(1, data.size()); + assertEquals(8, data.getBuffer(0).readableBytes()); + } + + @Test + public void testBatchReadDoesNotReleaseOversizedSkippedEntry() throws Exception { + MockBookies mockBookies = new MockBookies(); + mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8)); + mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 1L, newEntry(16)); + + long maxSize = BATCH_RESPONSE_HEADER_SIZE + 8 + Integer.BYTES + 16 + Integer.BYTES - 1; + ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, maxSize); + + assertNotNull(data); + assertEquals(1, data.size()); + assertEquals(8, data.getBuffer(0).readableBytes()); + assertEquals(16, mockBookies.readEntry(BOOKIE_ID, 0, LEDGER_ID, 1L).readableBytes()); + } + + private static ByteBuf newEntry(int size) { + return Unpooled.buffer(size).writeZero(size); + } +}