Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ build/
*.dat

# Downloaded released BookKeeper versions (cached by CI, not committed)
.released-versions/
.released-versions/
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte
// TODO: Shouldn't this be async?
ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException, BookieException;

/**
* Read a ledger entry only when it can fit the provided bound.
*
* <p>{@code maxEntrySize} includes the 4-byte per-entry delimiter used by batched-read response framing.
* Implementations return {@code null} when the entry exists but {@code entry.readableBytes() + 4}
* exceeds {@code maxEntrySize}.
*/
ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
throws IOException, NoLedgerException, BookieException;
long readLastAddConfirmed(long ledgerId) throws IOException, BookieException;
PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException;

Expand Down Expand Up @@ -127,4 +137,4 @@ public long getEntry() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,36 @@ public ByteBuf readEntry(long ledgerId, long entryId)
}
}

@Override
public ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
throws IOException, NoLedgerException, BookieException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
try {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
if (LOG.isTraceEnabled()) {
LOG.trace("Reading {}@{} with maxEntrySize {}", entryId, ledgerId, maxEntrySize);
}
ByteBuf entry = handle.readEntryIfFits(entryId, maxEntrySize);
if (entry != null) {
entrySize = entry.readableBytes();
bookieStats.getReadBytes().addCount(entrySize);
}
success = true;
return entry;
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
if (success) {
bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getReadBytesStats().registerSuccessfulValue(entrySize);
} else {
bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getReadBytesStats().registerFailedValue(entrySize);
}
}
}

public long readLastAddConfirmed(long ledgerId) throws IOException, BookieException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
return handle.getLastAddConfirmed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,40 @@ public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryExcept
return internalReadEntry(-1L, -1L, location, false /* validateEntry */);
}

@Override
public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(entryLocation);
long pos = posForOffset(entryLocation);

BufferedReadChannel fc = null;
int entrySize;
try {
fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);

ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
entrySize = sizeBuff.getInt(0);
if (entrySize + Integer.BYTES > maxEntrySize) {
return null;
}
validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);
} catch (EntryLookupException e) {
throw new IOException("Bad entry read from log file id: " + entryLogId, e);
}

ByteBuf data = allocator.buffer(entrySize, entrySize);
int rc = readFromLogChannel(entryLogId, fc, data, pos);
if (rc != entrySize) {
ReferenceCountUtil.release(data);
throw new IOException("Bad entry read from log file id: " + entryLogId,
new EntryLookupException("Short read for " + ledgerId + "@"
+ entryId + " in " + entryLogId + "@"
+ pos + "(" + rc + "!=" + entrySize + ")"));
}
data.writerIndex(entrySize);
return data;
}


private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
throws IOException, Bookie.NoEntryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,44 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
}
}

@Override
public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
entryId = ledgerCache.getLastEntry(ledgerId);
}

long offset;
long startTimeNanos = MathUtils.nowInNano();
boolean success = false;
try {
offset = ledgerCache.getEntryOffset(ledgerId, entryId);
if (offset == 0) {
throw new Bookie.NoEntryException(ledgerId, entryId);
}
success = true;
} finally {
if (success) {
getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
getOffsetStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
}

startTimeNanos = MathUtils.nowInNano();
success = false;
try {
ByteBuf entry = entryLogger.readEntryIfFits(ledgerId, entryId, offset, maxEntrySize);
success = true;
return entry;
} finally {
if (success) {
getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
getEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
}
}

private void flushOrCheckpoint(boolean isCheckpointFlush)
throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
abstract long addEntry(ByteBuf entry) throws IOException, BookieException;
abstract ByteBuf readEntry(long entryId) throws IOException, BookieException;

/**
* Read an entry only when it fits within {@code maxEntrySize}.
*
* <p>{@code maxEntrySize} includes the 4-byte per-entry delimiter used in batched-read response framing,
* so an exact fit is {@code entry.readableBytes() + 4 == maxEntrySize}.
*/
ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException {
ByteBuf entry = readEntry(entryId);
if (entry.readableBytes() + 4 > maxEntrySize) {
entry.release();
return null;
}
return entry;
}

abstract long getLastAddConfirmed() throws IOException, BookieException;
abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ ByteBuf readEntry(long entryId) throws IOException, BookieException {
return ledgerStorage.getEntry(ledgerId, entryId);
}

@Override
ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException {
return ledgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

@Override
long getLastAddConfirmed() throws IOException, BookieException {
return ledgerStorage.getLastAddConfirmed(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ void initialize(ServerConfiguration conf,
*/
ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException;

/**
* Read an entry from storage only if its serialized size, including the
* 4-byte per-entry framing delimiter, is less than or equal to maxEntrySize.
*
* <p>Returns {@code null} when the entry exists but does not fit the supplied budget.
*/
default ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
ByteBuf entry = getEntry(ledgerId, entryId);
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}

/**
* Get last add confirmed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,38 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
return buffToRet;
}

@Override
public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
EntryKeyValue kv = memTable.getLastEntry(ledgerId);
if (kv != null) {
ByteBuf entry = kv.getValueAsByteBuffer();
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}
return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

try {
return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
} catch (Bookie.NoEntryException nee) {
EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
if (kv == null) {
return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

ByteBuf entry = kv.getValueAsByteBuffer();
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}
}

@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ ByteBuf readEntry(long entryLocation)
ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
throws IOException, NoEntryException;

/**
* Read an entry only if its serialized size, including the 4-byte per-entry
* framing delimiter, is less than or equal to maxEntrySize.
*
* <p>Returns {@code null} when the entry exists but does not fit the supplied budget.
*/
default ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
throws IOException, NoEntryException {
ByteBuf entry = readEntry(ledgerId, entryId, entryLocation);
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}

/**
* Flush any outstanding writes to disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.bookkeeper.bookie.storage.EntryLogIds;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -211,6 +212,41 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
return internalReadEntry(ledgerId, entryId, entryLocation, true);
}

@Override
public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
throws IOException, NoEntryException {
int logId = (int) (entryLocation >> 32);
int pos = (int) (entryLocation & 0xFFFFFFFF);

long start = System.nanoTime();
LogReader reader = getReader(logId);

try {
int entrySize = reader.readEntrySizeAt(pos);
if (entrySize + Integer.BYTES > maxEntrySize) {
stats.getReadEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
return null;
}
long thisLedgerId = reader.readLongAt(pos);
long thisEntryId = reader.readLongAt(pos + Long.BYTES);
if (thisLedgerId != ledgerId || thisEntryId != entryId) {
throw new IOException(
exMsg("Bad location").kv("location", entryLocation)
.kv("expectedLedger", ledgerId).kv("expectedEntry", entryId)
.kv("foundLedger", thisLedgerId).kv("foundEntry", thisEntryId)
.toString());
}
ByteBuf buf = reader.readBufferAt(pos, entrySize);
stats.getReadEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
return buf;
} catch (EOFException eof) {
stats.getReadEntryStats().registerFailedEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
throw new NoEntryException(
exMsg("Entry location doesn't exist").kv("location", entryLocation).toString(),
ledgerId, entryId);
}
}

private LogReader getReader(int logId) throws IOException {
Cache<Integer, LogReader> cache = caches.get();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private int readBytesIntoBuf(ByteBuf buf, long offset, int size) throws IOExcept
}

@Override
public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
public int readEntrySizeAt(int offset) throws IOException, EOFException {
assertValidEntryOffset(offset);
int sizeOffset = offset - Integer.BYTES;
if (sizeOffset < 0) {
Expand All @@ -188,6 +188,12 @@ public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
.kv("maxSaneEntrySize", maxSaneEntrySize)
.kv("readEntrySize", entrySize).toString());
}
return entrySize;
}

@Override
public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
int entrySize = readEntrySizeAt(offset);
return readBufferAt(offset, entrySize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public interface LogReader extends AutoCloseable {
*/
long readLongAt(long offset) throws IOException, EOFException;

/**
* Read the size of an entry at a given offset.
* The size is stored at {@code offset - Integer.BYTES}.
*/
int readEntrySizeAt(int offset) throws IOException, EOFException;

/**
* Read an entry at a given offset.
* The size of the entry must be at (offset - Integer.BYTES).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ public long getLocation(long ledgerId, long entryId) throws IOException {
return getLedgerStorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, entryId);
}

@Override
public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
return getLedgerStorage(ledgerId).getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

private SingleDirectoryDbLedgerStorage getLedgerStorage(long ledgerId) {
return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs));
}
Expand Down
Loading
Loading