diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 1fa565d6ec788..16fffeb88f37b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -135,6 +135,12 @@ public CursorAlreadyClosedException(String msg) { } } + public static class CancelledException extends ManagedLedgerException { + public CancelledException(String msg) { + super(msg); + } + } + public static class TooManyRequestsException extends ManagedLedgerException { public TooManyRequestsException(String msg) { super(msg); @@ -211,4 +217,10 @@ public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in return null; } + + public static boolean shouldNotRead(ManagedLedgerException exception) { + return exception instanceof ConcurrentWaitCallbackException + || exception instanceof ManagedLedgerException.CursorAlreadyClosedException + || exception instanceof ManagedLedgerException.CancelledException; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f74e834e8be1e..d50368257c9db 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1076,8 +1076,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re ctx, maxPosition, skipCondition, true); int opReadId = op.id; if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { - op.recycle(); - callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); + op.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException()); return; } @@ -1188,7 +1187,8 @@ public boolean cancelPendingReadRequest() { return null; }); if (op != null) { - op.recycle(); + final var msg = op.toString(); + op.readEntriesFailed(new ManagedLedgerException.CancelledException(msg + " is cancelled")); } return op != null && op != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR; } @@ -2962,7 +2962,7 @@ protected void closeWaitingCursor() { OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR); if (opReadEntry != null && opReadEntry != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) { - opReadEntry.readEntriesFailed(new CursorAlreadyClosedException("Cursor is closing"), opReadEntry.ctx); + opReadEntry.readEntriesFailed(new CursorAlreadyClosedException("Cursor is closing")); } } @@ -3532,7 +3532,7 @@ void notifyEntriesAvailable() { log.debug("[{}] [{}] Cursor is already closed, ignoring notification", ledger.getName(), name); } opReadEntry.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException( - "Cursor was already closed"), opReadEntry.ctx); + "Cursor was already closed")); return; } PENDING_READ_OPS_UPDATER.incrementAndGet(this); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d157677d2105e..86831318734b4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -29,8 +29,6 @@ import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; @@ -92,7 +90,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; @@ -322,18 +319,14 @@ public boolean isFenced() { protected final ManagedLedgerMBeanImpl mbean; protected final Clock clock; - private static final AtomicLongFieldUpdater READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater - .newUpdater(ManagedLedgerImpl.class, "readOpCount"); - private volatile long readOpCount = 0; protected static final AtomicLongFieldUpdater ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater .newUpdater(ManagedLedgerImpl.class, "addOpCount"); private volatile long addOpCount = 0; - // last read-operation's callback to check read-timeout on it. - private volatile ReadEntryCallbackWrapper lastReadCallback = null; - private static final AtomicReferenceFieldUpdater - LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater - .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback"); + private volatile PendingReadEntriesOp lastReadEntriesOp = null; + private static final AtomicReferenceFieldUpdater + LAST_READ_ENTRIES_OP = AtomicReferenceFieldUpdater + .newUpdater(ManagedLedgerImpl.class, PendingReadEntriesOp.class, "lastReadEntriesOp"); /** * Queue of pending entries to be added to the managed ledger. Typically, entries are queued when a new ledger is. @@ -2064,7 +2057,7 @@ void clearPendingAddEntries(ManagedLedgerException e) { void asyncReadEntries(OpReadEntry opReadEntry) { final State state = STATE_UPDATER.get(this); if (state.isFenced() || state == State.Closed) { - opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx); + opReadEntry.readEntriesFailed(new ManagedLedgerFencedException()); return; } @@ -2105,8 +2098,7 @@ && isLedgerFullyAcked(ledgerId, ledgerInfo, opReadEntry.cursor)) { -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition, ex.getMessage()); - opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), - opReadEntry.ctx); + opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause())); return null; }); } @@ -2395,179 +2387,44 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, lastEntry); } - asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); + asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry); } protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) { mbean.addEntriesRead(1); + final var future = entryCache.asyncReadEntry(ledger, position); + future.whenComplete((entry, throwable) -> { + if (throwable == null) { + callback.readEntryComplete(entry, ctx); + } else { + callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx); + } + }); if (config.getReadEntryTimeoutSeconds() > 0) { - // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled - long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); long createdTime = System.nanoTime(); - ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(), - position.getEntryId(), callback, readOpCount, createdTime, ctx); - lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount); - } else { - entryCache.asyncReadEntry(ledger, position, callback, ctx); + lastReadEntriesOp = new PendingReadEntriesOp(position.getLedgerId(), position.getEntryId(), createdTime, + future); } } - protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, - Object ctx) { + protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry) { IntSupplier expectedReadCount = opReadEntry.cursor::getNumberOfCursorsAtSamePositionOrBefore; - if (config.getReadEntryTimeoutSeconds() > 0) { - // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled - long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); - long createdTime = System.nanoTime(); - ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, - opReadEntry, readOpCount, createdTime, ctx); - lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount); - } else { - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, opReadEntry, ctx); - } - } - - static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { - - volatile ReadEntryCallback readEntryCallback; - volatile ReadEntriesCallback readEntriesCallback; - String name; - long ledgerId; - long entryId; - volatile long readOpCount = -1; - private static final AtomicLongFieldUpdater READ_OP_COUNT_UPDATER = - AtomicLongFieldUpdater.newUpdater(ReadEntryCallbackWrapper.class, "readOpCount"); - volatile long createdTime = -1; - volatile Object cntx; - - final Handle recyclerHandle; - - private ReadEntryCallbackWrapper(Handle recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } - - static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback, - long readOpCount, long createdTime, Object ctx) { - ReadEntryCallbackWrapper readCallback = RECYCLER.get(); - readCallback.name = name; - readCallback.ledgerId = ledgerId; - readCallback.entryId = entryId; - readCallback.readEntryCallback = callback; - readCallback.cntx = ctx; - readCallback.readOpCount = readOpCount; - readCallback.createdTime = createdTime; - return readCallback; - } - - static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback, - long readOpCount, long createdTime, Object ctx) { - ReadEntryCallbackWrapper readCallback = RECYCLER.get(); - readCallback.name = name; - readCallback.ledgerId = ledgerId; - readCallback.entryId = entryId; - readCallback.readEntriesCallback = callback; - readCallback.cntx = ctx; - readCallback.readOpCount = readOpCount; - readCallback.createdTime = createdTime; - return readCallback; - } - - @Override - public void readEntryComplete(Entry entry, Object ctx) { - long reOpCount = reOpCount(ctx); - ReadEntryCallback callback = this.readEntryCallback; - Object cbCtx = this.cntx; - if (recycle(reOpCount)) { - callback.readEntryComplete(entry, cbCtx); + final var future = entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount); + future.whenComplete((entries, throwable) -> { + if (throwable == null) { + opReadEntry.readEntriesComplete(entries); } else { - if (log.isDebugEnabled()) { - log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId); - } - entry.release(); + opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable)); } + }); + if (config.getReadEntryTimeoutSeconds() > 0) { + long createdTime = System.nanoTime(); + lastReadEntriesOp = new PendingReadEntriesOp(ledger.getId(), firstEntry, createdTime, future); } + } - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - long reOpCount = reOpCount(ctx); - ReadEntryCallback callback = this.readEntryCallback; - Object cbCtx = this.cntx; - if (recycle(reOpCount)) { - callback.readEntryFailed(exception, cbCtx); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId); - } - } - } - - @Override - public void readEntriesComplete(List returnedEntries, Object ctx) { - long reOpCount = reOpCount(ctx); - ReadEntriesCallback callback = this.readEntriesCallback; - Object cbCtx = this.cntx; - if (recycle(reOpCount)) { - callback.readEntriesComplete(returnedEntries, cbCtx); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId); - } - returnedEntries.forEach(Entry::release); - } - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - long reOpCount = reOpCount(ctx); - ReadEntriesCallback callback = this.readEntriesCallback; - Object cbCtx = this.cntx; - if (recycle(reOpCount)) { - callback.readEntriesFailed(exception, cbCtx); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId); - } - } - } - - private long reOpCount(Object ctx) { - return (ctx instanceof Long) ? (long) ctx : -1; - } - - public void readFailed(ManagedLedgerException exception, Object ctx) { - if (readEntryCallback != null) { - readEntryFailed(exception, ctx); - } else if (readEntriesCallback != null) { - readEntriesFailed(exception, ctx); - } - // It happens when timeout-thread and read-callback both recycles at the same time. - // this read-callback has already been recycled so, do nothing.. - } - - private boolean recycle(long readOpCount) { - if (readOpCount != -1 - && READ_OP_COUNT_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, readOpCount, -1)) { - createdTime = -1; - readEntryCallback = null; - readEntriesCallback = null; - ledgerId = -1; - entryId = -1; - name = null; - recyclerHandle.recycle(this); - return true; - } - return false; - } - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected ReadEntryCallbackWrapper newObject(Handle handle) { - return new ReadEntryCallbackWrapper(handle); - } - }; - + record PendingReadEntriesOp(long ledgerId, long entryId, long createdTime, + CompletableFuture future) { } @Override @@ -4549,15 +4406,14 @@ private void checkReadTimeout() { if (timeoutSec < 1) { return; } - ReadEntryCallbackWrapper callback = this.lastReadCallback; - long readOpCount = callback != null ? callback.readOpCount : 0; - boolean timeout = callback != null && (TimeUnit.NANOSECONDS - .toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec); - if (readOpCount > 0 && timeout) { - log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, this.lastReadCallback.ledgerId, - this.lastReadCallback.entryId, timeoutSec); - callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount); - LAST_READ_CALLBACK_UPDATER.compareAndSet(this, callback, null); + final var lastReadEntriesOp = this.lastReadEntriesOp; + if (lastReadEntriesOp != null && TimeUnit.NANOSECONDS.toSeconds( + System.nanoTime() - lastReadEntriesOp.createdTime) >= timeoutSec) { + log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, lastReadEntriesOp.ledgerId, + lastReadEntriesOp.entryId, timeoutSec); + lastReadEntriesOp.future.completeExceptionally(createManagedLedgerException( + BKException.Code.TimeoutException)); + LAST_READ_ENTRIES_OP.compareAndSet(this, lastReadEntriesOp, null); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index b618a25aa3d75..db319c38ecfef 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class OpReadEntry implements ReadEntriesCallback { +class OpReadEntry { static final OpReadEntry WAITING_READ_OP_FOR_CLOSED_CURSOR = new OpReadEntry(); private static final AtomicInteger opReadIdGenerator = new AtomicInteger(1); /** @@ -47,7 +47,7 @@ class OpReadEntry implements ReadEntriesCallback { Position readPosition; private int count; private ReadEntriesCallback callback; - Object ctx; + private Object ctx; // Results private List entries; @@ -110,32 +110,35 @@ private void internalReadEntriesComplete(List returnedEntries) { checkReadCompletion(); } - @Override - public void readEntriesComplete(List returnedEntries, Object ctx) { + public void readEntriesComplete(List returnedEntries) { try { internalReadEntriesComplete(returnedEntries); } catch (Throwable throwable) { log.error("[{}] Fallback to readEntriesFailed for exception in readEntriesComplete", this, throwable); - readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx); + readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable)); } } - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + public void readEntriesFailed(ManagedLedgerException exception) { try { - internalReadEntriesFailed(exception, ctx); + internalReadEntriesFailed(exception); } catch (Throwable throwable) { // At least we should complete the callback - fail(ManagedLedgerException.getManagedLedgerException(throwable), ctx); + fail(ManagedLedgerException.getManagedLedgerException(throwable)); } } - private void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) { + private void internalReadEntriesFailed(ManagedLedgerException exception) { cursor.readOperationCompleted(); + if (ManagedLedgerException.shouldNotRead(exception)) { + entries.forEach(Entry::release); + fail(exception); + return; + } if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them - complete(ctx); + complete(); } else if (!cursor.isClosed() && cursor.getConfig().isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), @@ -153,7 +156,7 @@ private void internalReadEntriesFailed(ManagedLedgerException exception, Object } // fail callback if it couldn't find next valid ledger if (nexReadPosition == null) { - fail(exception, ctx); + fail(exception); return; } updateReadPosition(nexReadPosition); @@ -174,7 +177,7 @@ private void internalReadEntriesFailed(ManagedLedgerException exception, Object } } - fail(exception, ctx); + fail(exception); } } @@ -198,7 +201,7 @@ void checkReadCompletion() { try { cursor.readOperationCompleted(); } finally { - complete(ctx); + complete(); } } } @@ -236,7 +239,7 @@ protected OpReadEntry newObject(Recycler.Handle recyclerHandle) { } }; - public void recycle() { + private void recycle() { if (recyclerHandle == null) { // This is the no-op instance, do not recycle return; @@ -255,7 +258,7 @@ public void recycle() { recyclerHandle.recycle(this); } - private void complete(Object ctx) { + private void complete() { cursor.ledger.getExecutor().execute(() -> { try { callback.readEntriesComplete(entries, ctx); @@ -266,7 +269,7 @@ private void complete(Object ctx) { }); } - private void fail(ManagedLedgerException e, Object ctx) { + private void fail(ManagedLedgerException e) { try { callback.readEntriesFailed(e, ctx); cursor.ledger.mbean.recordReadEntriesError(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java index b2ebf7430560c..775812c26dc0f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java @@ -18,10 +18,10 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; @@ -81,13 +81,10 @@ public interface EntryCache { * the last entry to read (inclusive) * @param expectedReadCount resolves the expected read count for the given entry. When the expected read count is * >0, the entry can be cached and reused later. - * @param callback - * the callback object that will be notified when read is done - * @param ctx - * the context object + * @return the future of entries */ - void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, - ReadEntriesCallback callback, Object ctx); + CompletableFuture> asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, + IntSupplier expectedReadCount); /** * Read entry at given position from the cache or from bookkeeper. @@ -98,12 +95,9 @@ void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier * the ledger handle * @param position * position to read the entry from - * @param callback - * the callback object that will be notified when read is done - * @param ctx - * the context object + * @return the future of the entry */ - void asyncReadEntry(ReadHandle lh, Position position, ReadEntryCallback callback, Object ctx); + CompletableFuture asyncReadEntry(ReadHandle lh, Position position); /** * Get the total size in bytes of all the entries stored in this cache. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index b5a45415a4fe1..a4e26fb2923f7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -22,10 +22,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; @@ -68,9 +68,9 @@ public void clear() { } @Override - public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, - final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( + public CompletableFuture> asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, + IntSupplier expectedReadCount) { + return ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenApplyAsync( ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; @@ -89,21 +89,18 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSu ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize); ml.getMbean().addReadEntriesSample(entries.size(), totalSize); - callback.readEntriesComplete(entries, ctx); - }, ml.getExecutor()).exceptionally(exception -> { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - return null; - }); + return entries; + }, ml.getExecutor()); } @Override - public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback, - Object ctx) { + public CompletableFuture asyncReadEntry(ReadHandle lh, Position position) { + final var future = new CompletableFuture(); ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync( (ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); - callback.readEntryFailed(createManagedLedgerException(exception), ctx); + future.completeExceptionally(createManagedLedgerException(exception)); return; } @@ -116,15 +113,15 @@ public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.Read ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength()); ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength()); ml.getMbean().addReadEntriesSample(1, returnEntry.getLength()); - callback.readEntryComplete(returnEntry, ctx); + future.complete(returnEntry); } else { - callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), - ctx); + future.completeExceptionally(new ManagedLedgerException("Could not read given position")); } } finally { ledgerEntries.close(); } }, ml.getExecutor()); + return future; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index 5f904f3cf8420..23325f650c4ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl.cache; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import com.google.common.annotations.VisibleForTesting; import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.Collections; @@ -31,7 +32,6 @@ import java.util.function.IntSupplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.EntryImpl; @@ -132,7 +132,7 @@ PendingReadKey reminderOnRight(PendingReadKey other) { } - private record ReadEntriesCallbackWithContext(AsyncCallbacks.ReadEntriesCallback callback, Object ctx, + private record ReadEntriesCallbackWithContext(CompletableFuture> future, long startEntry, long endEntry) { } @@ -248,12 +248,11 @@ public synchronized void attach(CompletableFuture> handle) { }); } - synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback, - Object ctx, long startEntry, long endEntry) { + synchronized boolean addListener(CompletableFuture> entriesFuture, long startEntry, long endEntry) { if (state == PendingReadState.COMPLETED) { return false; } - listeners.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry)); + listeners.add(new ReadEntriesCallbackWithContext(entriesFuture, startEntry, endEntry)); return true; } @@ -276,15 +275,13 @@ private void readEntriesComplete(List callbacks, if (first.startEntry == key.startEntry && first.endEntry == key.endEntry) { // perfect match, no copy, this is the most common case - first.callback.readEntriesComplete(entriesToReturn, first.ctx); + first.future.complete(entriesToReturn); } else { - first.callback.readEntriesComplete( - keepEntries(entriesToReturn, first.startEntry, first.endEntry), first.ctx); + first.future.complete(keepEntries(entriesToReturn, first.startEntry, first.endEntry)); } } else { for (ReadEntriesCallbackWithContext callback : callbacks) { - callback.callback.readEntriesComplete( - copyEntries(entriesToReturn, callback.startEntry, callback.endEntry), callback.ctx); + callback.future.complete(copyEntries(entriesToReturn, callback.startEntry, callback.endEntry)); } for (Entry entry : entriesToReturn) { // don't decrease the read count when these entries are released @@ -298,7 +295,7 @@ private void readEntriesComplete(List callbacks, private void readEntriesFailed(List callbacks, Throwable error) { for (ReadEntriesCallbackWithContext callback : callbacks) { ManagedLedgerException mlException = createManagedLedgerException(error); - callback.callback.readEntriesFailed(mlException, callback.ctx); + callback.future.completeExceptionally(mlException); } } @@ -328,9 +325,10 @@ private static List copyEntries(List entriesToReturn, long startEn } } - - void readEntries(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, - final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { + @VisibleForTesting + CompletableFuture> readEntries(ReadHandle lh, long firstEntry, long lastEntry, + IntSupplier expectedReadCount) { + final var readFuture = new CompletableFuture>(); final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry); ConcurrentMap pendingReadsForLedger = @@ -345,8 +343,7 @@ void readEntries(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier exp if (findBestCandidateOutcome.needsAdditionalReads()) { CompletableFuture> readFromMidFuture = new CompletableFuture<>(); - ReadEntriesCallback presentReadCallback = new ReadEntriesCallback(readFromMidFuture); - listenerAdded = pendingRead.addListener(presentReadCallback, ctx, key.startEntry, key.endEntry); + listenerAdded = pendingRead.addListener(readFromMidFuture, key.startEntry, key.endEntry); if (!listenerAdded) { continue; } @@ -367,16 +364,16 @@ void readEntries(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier exp }) .whenComplete((finalResult, e) -> { if (e != null) { - callback.readEntriesFailed(createManagedLedgerException(e), ctx); + readFuture.completeExceptionally(e); releaseEntriesSafely(readFromLeftFuture); releaseEntriesSafely(readFromMidFuture); releaseEntriesSafely(readFromRightFuture); } else { - callback.readEntriesComplete(finalResult, ctx); + readFuture.complete(finalResult); } }); } else { - listenerAdded = pendingRead.addListener(callback, ctx, key.startEntry, key.endEntry); + listenerAdded = pendingRead.addListener(readFuture, key.startEntry, key.endEntry); } if (createdByThisThread.get()) { @@ -385,21 +382,18 @@ void readEntries(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier exp pendingRead.attach(readResult); } } + return readFuture; } private CompletableFuture> recursiveReadMissingEntriesAsync(ReadHandle lh, IntSupplier expectedReadCount, PendingReadKey missingKey) { - CompletableFuture> future; if (missingKey != null) { - future = new CompletableFuture<>(); - ReadEntriesCallback callback = new ReadEntriesCallback(future); - rangeEntryCache.asyncReadEntry0(lh, missingKey.startEntry, missingKey.endEntry, expectedReadCount, callback, - null, false); + return rangeEntryCache.asyncReadEntry0(lh, missingKey.startEntry, missingKey.endEntry, expectedReadCount, + false); } else { - future = CompletableFuture.completedFuture(Collections.emptyList()); + return CompletableFuture.completedFuture(Collections.emptyList()); } - return future; } private void releaseEntriesSafely(CompletableFuture> future) { @@ -417,22 +411,4 @@ void invalidateLedger(long id) { cachedPendingReads.remove(id); } - static class ReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback { - - private final CompletableFuture> completableFuture; - - public ReadEntriesCallback(CompletableFuture> completableFuture) { - this.completableFuture = completableFuture; - } - - @Override - public void readEntriesComplete(List entries, Object ctx) { - completableFuture.complete(entries); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - completableFuture.completeExceptionally(exception); - } - } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index fd391ba2bf6b0..b07f55575c3a6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -36,8 +36,6 @@ import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -230,65 +228,63 @@ public void invalidateAllEntries(long ledgerId) { } @Override - public void asyncReadEntry(ReadHandle lh, Position position, final ReadEntryCallback callback, - final Object ctx) { + public CompletableFuture asyncReadEntry(ReadHandle lh, Position position) { try { + final var future = new CompletableFuture(); asyncReadEntriesByPosition(lh, position, position, 1, - () -> DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY ? 1 : 0, - new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { + () -> DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY ? 1 : 0, true + ).whenComplete((entries, throwable) -> { + if (throwable == null) { if (entries.isEmpty()) { - callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx); + future.completeExceptionally(new ManagedLedgerException("Could not read given position")); } else { - callback.readEntryComplete(entries.get(0), ctx); + future.complete(entries.get(0)); } + } else { + future.completeExceptionally(ManagedLedgerException.getManagedLedgerException(throwable)); } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - callback.readEntryFailed(exception, ctx); - } - }, ctx, true); + }); + return future; } catch (Throwable t) { log.warn("failed to read entries for {}-{}", lh.getId(), position, t); // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt // (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from // the bookie) invalidateAllEntries(lh.getId()); - callback.readEntryFailed(createManagedLedgerException(t), ctx); + return CompletableFuture.failedFuture(createManagedLedgerException(t)); } } @Override - public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, - final ReadEntriesCallback callback, Object ctx) { + public CompletableFuture> asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, + IntSupplier expectedReadCount) { try { - asyncReadEntry0(lh, firstEntry, lastEntry, expectedReadCount, callback, ctx, true); + return asyncReadEntry0(lh, firstEntry, lastEntry, expectedReadCount, true); } catch (Throwable t) { log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t); // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt // (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from // the bookie) invalidateAllEntries(lh.getId()); - callback.readEntriesFailed(createManagedLedgerException(t), ctx); + return CompletableFuture.failedFuture(t); } } @SuppressWarnings({ "unchecked", "rawtypes" }) - void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, - final ReadEntriesCallback callback, Object ctx, boolean acquirePermits) { + CompletableFuture> asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, + IntSupplier expectedReadCount, + boolean acquirePermits) { final long ledgerId = lh.getId(); final int numberOfEntries = (int) (lastEntry - firstEntry) + 1; final Position firstPosition = PositionFactory.create(ledgerId, firstEntry); final Position lastPosition = PositionFactory.create(ledgerId, lastEntry); - asyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, callback, ctx, + return asyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, acquirePermits); } - void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, - IntSupplier expectedReadCount, final ReadEntriesCallback originalCallback, - Object ctx, boolean acquirePermits) { + CompletableFuture> asyncReadEntriesByPosition( + ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, + IntSupplier expectedReadCount, boolean acquirePermits) { checkArgument(firstPosition.getLedgerId() == lastPosition.getLedgerId(), "Invalid range. Entries %s and %s should be in the same ledger.", firstPosition, lastPosition); @@ -303,8 +299,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter(); if (!acquirePermits || pendingReadsLimiter.isDisabled()) { - doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, - originalCallback, ctx); + return doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount); } else { long estimatedEntrySize = getEstimatedEntrySize(lh); long estimatedReadSize = numberOfEntries * estimatedEntrySize; @@ -313,27 +308,34 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position estimatedReadSize, numberOfEntries, estimatedEntrySize); } + final var future = new CompletableFuture>(); Optional optionalHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle -> { // permits were not immediately available, callback will be executed when permits are acquired // or timeout - ml.getExecutor().execute(() -> { - doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, - expectedReadCount, originalCallback, ctx, handle, estimatedReadSize); - }); + ml.getExecutor().execute(() -> doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, + lastPosition, numberOfEntries, expectedReadCount, handle, estimatedReadSize + ).whenComplete((entries, e) -> { + if (e == null) { + future.complete(entries); + } else { + future.completeExceptionally(e); + } + })); }); // permits were immediately available and acquired if (optionalHandle.isPresent()) { - doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, - expectedReadCount, originalCallback, ctx, optionalHandle.get(), estimatedReadSize); + return doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, + expectedReadCount, optionalHandle.get(), estimatedReadSize); + } else { + return future; // will be completed by `pendingReadsLimiter` } } } - void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position firstPosition, Position lastPosition, - int numberOfEntries, IntSupplier expectedReadCount, - final ReadEntriesCallback originalCallback, Object ctx, - InflightReadsLimiter.Handle handle, long estimatedReadSize) { + CompletableFuture> doAsyncReadEntriesWithAcquiredPermits( + ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, + IntSupplier expectedReadCount, InflightReadsLimiter.Handle handle, long estimatedReadSize) { if (!handle.success()) { String message = String.format( "Couldn't acquire enough permits on the max reads in flight limiter to read from ledger " @@ -342,42 +344,31 @@ void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position firstPosition + "managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis and " + "managedLedgerMaxReadsInFlightSizeInMB)", lh.getId(), getName(), estimatedReadSize, numberOfEntries); - originalCallback.readEntriesFailed(new ManagedLedgerException.TooManyRequestsException(message), ctx); - return; + return CompletableFuture.failedFuture(new ManagedLedgerException.TooManyRequestsException(message)); } InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter(); - ReadEntriesCallback wrappedCallback = new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx2) { - if (!entries.isEmpty()) { - // release permits only when entries have been handled - AtomicInteger remainingCount = new AtomicInteger(entries.size()); - for (Entry entry : entries) { - ((EntryImpl) entry).onDeallocate(() -> { - if (remainingCount.decrementAndGet() <= 0) { - pendingReadsLimiter.release(handle); - } - }); - } - } else { - pendingReadsLimiter.release(handle); - } - originalCallback.readEntriesComplete(entries, ctx2); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx2) { + return doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, + expectedReadCount + ).whenComplete((entries, e) -> { + if (e != null || entries.isEmpty()) { pendingReadsLimiter.release(handle); - originalCallback.readEntriesFailed(exception, ctx2); + return; + } + // release permits only when entries have been handled + AtomicInteger remainingCount = new AtomicInteger(entries.size()); + for (Entry entry : entries) { + ((EntryImpl) entry).onDeallocate(() -> { + if (remainingCount.decrementAndGet() <= 0) { + pendingReadsLimiter.release(handle); + } + }); } - }; - doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, - wrappedCallback, ctx); + }); } - void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, - IntSupplier expectedReadCount, final ReadEntriesCallback callback, - Object ctx) { + CompletableFuture> doAsyncReadEntriesByPosition( + ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, + IntSupplier expectedReadCount) { Collection cachedEntries; if (firstPosition.compareTo(lastPosition) == 0) { ReferenceCountedEntry cachedEntry = entries.get(firstPosition); @@ -411,7 +402,7 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio } if (cachedEntries.size() == numberOfEntries) { - callback.readEntriesComplete(entriesToReturn, ctx); + return CompletableFuture.completedFuture(entriesToReturn); } else { // read missing ranges long firstEntryInRange = -1; @@ -423,19 +414,17 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio } } else { if (firstEntryInRange != -1) { - futures.add( - readMissingEntriesAsync(lh, firstEntryInRange, firstPosition.getEntryId() + i - 1, - expectedReadCount, ctx)); + futures.add(pendingReadsManager.readEntries(lh, firstEntryInRange, + firstPosition.getEntryId() + i - 1, expectedReadCount)); firstEntryInRange = -1; } } } if (firstEntryInRange != -1) { - futures.add( - readMissingEntriesAsync(lh, firstEntryInRange, lastPosition.getEntryId(), - expectedReadCount, ctx)); + futures.add(pendingReadsManager.readEntries(lh, firstEntryInRange, lastPosition.getEntryId(), + expectedReadCount)); } - FutureUtil.waitForAll(futures).whenComplete((__, t) -> { + return FutureUtil.waitForAll(futures).handle((__, t) -> { if (t != null) { // release cached entries placed in entriesToReturn for (Entry entry : entriesToReturn) { @@ -456,9 +445,7 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio } log.warn("Failed to read missing entries from bookkeeper, retrying by reading all", t); // Read all the entries from bookkeeper - pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), lastPosition.getEntryId(), - expectedReadCount, callback, ctx); - return; + return Optional.>empty(); } for (CompletableFuture> future : futures) { List readEntries = future.getNow(null); @@ -474,25 +461,19 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio } } } - callback.readEntriesComplete(entriesToReturn, ctx); - }); + return Optional.of(entriesToReturn); + }).thenCompose(optEntries -> optEntries + .map(CompletableFuture::completedFuture) + .orElseGet(() -> pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), + lastPosition.getEntryId(), expectedReadCount))); } } else { // Read all the entries from bookkeeper - pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), lastPosition.getEntryId(), - expectedReadCount, callback, ctx); + return pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), lastPosition.getEntryId(), + expectedReadCount); } } - private CompletableFuture> readMissingEntriesAsync(ReadHandle lh, - long firstEntry, long lastEntry, - IntSupplier expectedReadCount, Object ctx) { - CompletableFuture> future = new CompletableFuture<>(); - PendingReadsManager.ReadEntriesCallback callback = new PendingReadsManager.ReadEntriesCallback(future); - pendingReadsManager.readEntries(lh, firstEntry, lastEntry, expectedReadCount, callback, ctx); - return future; - } - @VisibleForTesting public long getEstimatedEntrySize(ReadHandle lh) { if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 49743d9386c90..b019b483c61fb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -29,15 +29,12 @@ import java.util.List; import java.util.Optional; import java.util.Random; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; @@ -47,7 +44,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.awaitility.Awaitility; -import org.testng.Assert; import org.testng.annotations.Test; public class EntryCacheManagerTest extends MockedBookKeeperTestCase { @@ -408,23 +404,11 @@ void entryCacheDisabledAsyncReadEntry() throws Exception { EntryCacheManager cacheManager = factory.getEntryCacheManager(); EntryCache entryCache = cacheManager.getEntryCache(ml1); - final CountDownLatch counter = new CountDownLatch(1); when(ml1.getLastConfirmedEntry()).thenReturn(PositionFactory.create(1L, 1L)); when(ml1.getOptionalLedgerInfo(lh.getId())).thenReturn(Optional.of(mock( MLDataFormats.ManagedLedgerInfo.LedgerInfo.class))); - entryCache.asyncReadEntry(lh, PositionFactory.create(1L, 1L), new AsyncCallbacks.ReadEntryCallback() { - public void readEntryComplete(Entry entry, Object ctx) { - Assert.assertNotEquals(entry, null); - entry.release(); - counter.countDown(); - } - - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - Assert.fail("should not have failed"); - counter.countDown(); - } - }, null); - counter.await(); + final var entry = entryCache.asyncReadEntry(lh, PositionFactory.create(1L, 1L)).get(); + entry.release(); verify(lh).readUnconfirmedAsync(anyLong(), anyLong()); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index 0fe889a2da5ec..255fcd105458a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -41,7 +41,6 @@ import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -255,19 +254,7 @@ static ReadHandle getLedgerHandle() { private List readEntry(EntryCache entryCache, ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, Consumer assertion) throws InterruptedException { - final var future = new CompletableFuture>(); - entryCache.asyncReadEntry(lh, firstEntry, lastEntry, expectedReadCount, - new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - future.complete(entries); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); + final var future = entryCache.asyncReadEntry(lh, firstEntry, lastEntry, expectedReadCount); try { final var entries = future.get(); assertNull(assertion); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java index 01a8808a2b96c..63c3c668aba64 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java @@ -21,7 +21,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.UUID; @@ -30,10 +29,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; @@ -137,10 +134,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(), anyLong()); // Initialize "entryCache.estimatedEntrySize" to the correct value. - Object ctx = new Object(); - SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback(); - entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, () -> 1, cb0, ctx); - cb0.entries.join(); + releaseEntries(entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, () -> 1).get()); int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue(); Awaitility.await().untilAsserted(() -> { long remainingBytes = limiter.getRemainingBytes(); @@ -149,19 +143,30 @@ public void testPreciseLimitation(String missingCase) throws Exception { log.info("remainingBytes 0: {}", limiter.getRemainingBytes()); // Concurrency reading. - - SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback(); - SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback(); + final var future1 = new CompletableFuture>(); threadFactory.newThread(() -> { - entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, () -> 1, cb1, ctx); + entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, () -> 1).whenComplete((entries, throwable) -> { + if (throwable == null) { + future1.complete(entries); + } else { + future1.completeExceptionally(throwable); + } + }); }).start(); + final var future2 = new CompletableFuture>(); threadFactory.newThread(() -> { try { firstReadingStarted.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } - entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, () -> 1, cb2, ctx); + entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, () -> 1).whenComplete((entries, throwable) -> { + if (throwable == null) { + future2.complete(entries); + } else { + future2.completeExceptionally(throwable); + } + }); }).start(); long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry); @@ -176,7 +181,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { // Complete the read1. Thread.sleep(3000); readCompleteSignal1.countDown(); - cb1.entries.join(); + releaseEntries(future1.get()); long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry); long remainingBytesExpected2 = totalCapacity - bytesAcquired2; log.info("acquired : {}", bytesAcquired2); @@ -187,7 +192,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { }); readCompleteSignal2.countDown(); - cb2.entries.join(); + releaseEntries(future2.get()); Awaitility.await().untilAsserted(() -> { long remainingBytes = limiter.getRemainingBytes(); log.info("remainingBytes 2: {}", remainingBytes); @@ -202,24 +207,10 @@ private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntry return entriesCount * perEntrySize; } - class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback { - - CompletableFuture> entries = new CompletableFuture<>(); - - @Override - public void readEntriesComplete(List entriesRead, Object ctx) { - List list = new ArrayList<>(entriesRead.size()); - for (Entry entry : entriesRead) { - byte b = entry.getDataBuffer().readByte(); - list.add(b); - entry.release(); - } - this.entries.complete(list); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - this.entries.completeExceptionally(exception); + // The permits will only be released after entries are released + private void releaseEntries(List entries) { + for (Entry entry : entries) { + entry.release(); } } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 23b739987f4c9..ade93a84abce3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -20,8 +20,6 @@ import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; @@ -59,7 +57,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -74,7 +71,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; @@ -123,7 +119,6 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -4405,62 +4400,6 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { Awaitility.await().untilAsserted(() -> assertTrue(flag.get())); } - @Test - public void testOpReadEntryRecycle() throws Exception { - final Map opReadEntryToRecycleCount = new ConcurrentHashMap<>(); - final Supplier createOpReadEntry = () -> { - final OpReadEntry mockedOpReadEntry = mock(OpReadEntry.class); - doAnswer(__ -> opReadEntryToRecycleCount.computeIfAbsent(mockedOpReadEntry, - ignored -> new AtomicInteger(0)).getAndIncrement() - ).when(mockedOpReadEntry).recycle(); - return mockedOpReadEntry; - }; - - @Cleanup final MockedStatic mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class); - mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), - any(), any(), any(), anyBoolean())).thenAnswer(__ -> createOpReadEntry.get()); - - final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig(); - ledgerConfig.setNewEntriesCheckDelayInMillis(10); - final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", ledgerConfig); - final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my_cursor"); - final List exceptions = new ArrayList<>(); - final AtomicBoolean readEntriesSuccess = new AtomicBoolean(false); - final ReadEntriesCallback callback = new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - readEntriesSuccess.set(true); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - exceptions.add(exception); - } - }; - - final int numReadRequests = 3; - for (int i = 0; i < numReadRequests; i++) { - cursor.asyncReadEntriesOrWait(1, callback, null, PositionFactory.create(0, 0)); - } - Awaitility.await().atMost(Duration.ofSeconds(1)) - .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1)); - assertTrue(cursor.cancelPendingReadRequest()); - - ledger.addEntry(new byte[1]); - Awaitility.await().atMost(Duration.ofSeconds(1)) - .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty())); - assertFalse(readEntriesSuccess.get()); - - assertEquals(exceptions.size(), numReadRequests - 1); - exceptions.forEach(e -> assertEquals(e.getMessage(), "We can only have a single waiting callback")); - assertEquals(opReadEntryToRecycleCount.size(), 3); - assertEquals(opReadEntryToRecycleCount.entrySet().stream() - .map(Map.Entry::getValue) - .map(AtomicInteger::get) - .collect(Collectors.toList()), - Arrays.asList(1, 1, 1)); - } - @Test public void testLazyCursorLedgerCreation() throws Exception { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); @@ -4650,7 +4589,7 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { }) .when(ledger) .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), - Mockito.anyLong(), Mockito.any(), Mockito.any()); + Mockito.anyLong(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4756,7 +4695,7 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws }) .when(ledger) .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), - Mockito.anyLong(), Mockito.any(), Mockito.any()); + Mockito.anyLong(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java index 9c6c79eb95b40..91e2026a3be3a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -27,8 +27,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertSame; -import static org.testng.Assert.assertTrue; import io.opentelemetry.api.OpenTelemetry; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -45,20 +43,13 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.IntSupplier; -import java.util.stream.Collectors; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.tuple.Pair; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -67,8 +58,6 @@ @Slf4j public class PendingReadsManagerTest { - static final Object CTX = "foo"; - static final Object CTX2 = "far"; static final long LEDGER_ID = 123414L; private final Map, AtomicInteger> entryRangeReadCount = new ConcurrentHashMap<>(); ExecutorService orderedExecutor; @@ -107,21 +96,13 @@ void setupMocks() { mock(ScheduledExecutorService.class), OpenTelemetry.noop()); when(rangeEntryCache.getPendingReadsLimiter()).thenReturn(inflighReadsLimiter); pendingReadsManager = new PendingReadsManager(rangeEntryCache); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock); - ReadHandle rh = invocationOnMock.getArgument(0); - long startEntry = invocationOnMock.getArgument(1); - long endEntry = invocationOnMock.getArgument(2); - IntSupplier expectedReadCount = invocationOnMock.getArgument(3); - AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(4); - Object ctx = invocationOnMock.getArgument(5); - pendingReadsManager.readEntries(lh, startEntry, endEntry, expectedReadCount, callback, ctx); - return null; - } - }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(), - any(), any(), any(), anyBoolean()); + doAnswer(invocationOnMock -> { + log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock); + long startEntry = invocationOnMock.getArgument(1); + long endEntry = invocationOnMock.getArgument(2); + IntSupplier expectedReadCount = invocationOnMock.getArgument(3); + return pendingReadsManager.readEntries(lh, startEntry, endEntry, expectedReadCount); + }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(), any(), anyBoolean()); lh = mock(ReadHandle.class); ml = mock(ManagedLedgerImpl.class); @@ -130,32 +111,6 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { entryRangeReadCount.clear(); } - - @Data - private static class CapturingReadEntriesCallback extends CompletableFuture - implements AsyncCallbacks.ReadEntriesCallback { - List entries; - Object ctx; - Throwable error; - - @Override - public synchronized void readEntriesComplete(List entries, Object ctx) { - this.entries = entries.stream().map(Entry::getPosition).collect(Collectors.toList()); - this.ctx = ctx; - this.error = null; - this.complete(null); - } - - @Override - public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - this.entries = null; - this.ctx = ctx; - this.error = exception; - this.completeExceptionally(exception); - } - - } - private static List buildList(long start, long end) { List result = new ArrayList<>(); for (long i = start; i <= end; i++) { @@ -167,7 +122,7 @@ private static List buildList(long start, long end) { } - private void verifyRange(List entries, long firstEntry, long endEntry) { + private void verifyRange(List entries, long firstEntry, long endEntry) { int pos = 0; log.info("verifyRange numEntries {}", entries.size()); for (long entry = firstEntry; entry <= endEntry; entry++) { @@ -225,18 +180,13 @@ public void simpleRead() throws Exception { PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); + final var future = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); // complete the read read1.storageReadCompleted(); // wait for the callback to complete - callback.get(); - assertSame(callback.getCtx(), CTX); - - // verify - verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(future.get(), firstEntry, endEntry); } @@ -251,28 +201,21 @@ public void simpleConcurrentReadPerfectMatch() throws Exception { prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); - - CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback2, CTX2); + final var future1 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); + final var future2 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); // complete the read from BK // only one read completes 2 callbacks read1.storageReadCompleted(); - callback.get(); - callback2.get(); - - assertSame(callback.getCtx(), CTX); - assertSame(callback2.getCtx(), CTX2); - - verifyRange(callback.entries, firstEntry, endEntry); - verifyRange(callback2.entries, firstEntry, endEntry); + final var entries1 = future1.get(); + final var entries2 = future2.get(); + verifyRange(entries1, firstEntry, endEntry); + verifyRange(entries2, firstEntry, endEntry); int pos = 0; for (long entry = firstEntry; entry <= endEntry; entry++) { - assertTrue(callback.entries.get(pos).compareTo(callback2.entries.get(pos)) == 0); + assertEquals(entries1.get(pos).getPosition().compareTo(entries2.get(pos).getPosition()), 0); pos++; } @@ -293,32 +236,24 @@ public void simpleConcurrentReadIncluding() throws Exception { prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); - - - CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, - CTX2); + final var future1 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); + final var future2 = pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, + expectedReadCount); // complete the read from BK // only one read completes 2 callbacks read1.storageReadCompleted(); - callback.get(); - callback2.get(); - - assertSame(callback.getCtx(), CTX); - assertSame(callback2.getCtx(), CTX2); - - verifyRange(callback.entries, firstEntry, endEntry); - verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + final var entries1 = future1.get(); + final var entries2 = future2.get(); + verifyRange(entries1, firstEntry, endEntry); + verifyRange(entries2, firstEntrySecondRead, endEntrySecondRead); int pos = 0; for (long entry = firstEntry; entry <= endEntry; entry++) { if (entry >= firstEntrySecondRead && entry <= endEntrySecondRead) { int posInSecondList = (int) (pos - (firstEntrySecondRead - firstEntry)); - assertTrue(callback.entries.get(pos).compareTo(callback2.entries.get(posInSecondList)) == 0); + assertEquals(entries1.get(pos).getPosition().compareTo(entries2.get(posInSecondList).getPosition()), 0); } pos++; } @@ -343,26 +278,20 @@ public void simpleConcurrentReadMissingLeft() throws Exception { prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); - - CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, - CTX2); + final var future1 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); + final var future2 = pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, + expectedReadCount); // complete the read from BK read1.storageReadCompleted(); // the first read can move forward - callback.get(); + final var entries1 = future1.get(); readForLeft.storageReadCompleted(); - callback2.get(); - - assertSame(callback.getCtx(), CTX); - assertSame(callback2.getCtx(), CTX2); + final var entries2 = future2.get(); - verifyRange(callback.entries, firstEntry, endEntry); - verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + verifyRange(entries1, firstEntry, endEntry); + verifyRange(entries2, firstEntrySecondRead, endEntrySecondRead); } @@ -384,26 +313,20 @@ public void simpleConcurrentReadMissingRight() throws Exception { prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); - - CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, - CTX2); + final var future1 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); + final var future2 = pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, + expectedReadCount); // complete the read from BK read1.storageReadCompleted(); // the first read can move forward - callback.get(); + final var entries1 = future1.get(); readForRight.storageReadCompleted(); - callback2.get(); - - assertSame(callback.getCtx(), CTX); - assertSame(callback2.getCtx(), CTX2); + final var entries2 = future2.get(); - verifyRange(callback.entries, firstEntry, endEntry); - verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + verifyRange(entries1, firstEntry, endEntry); + verifyRange(entries2, firstEntrySecondRead, endEntrySecondRead); } @@ -428,27 +351,21 @@ public void simpleConcurrentReadMissingBoth() throws Exception { prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); - - CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, - CTX2); + final var future1 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); + final var future2 = pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, + expectedReadCount); // complete the read from BK read1.storageReadCompleted(); // the first read can move forward - callback.get(); + final var entries1 = future1.get(); readForLeft.storageReadCompleted(); readForRight.storageReadCompleted(); - callback2.get(); + final var entries2 = future2.get(); - assertSame(callback.getCtx(), CTX); - assertSame(callback2.getCtx(), CTX2); - - verifyRange(callback.entries, firstEntry, endEntry); - verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + verifyRange(entries1, firstEntry, endEntry); + verifyRange(entries2, firstEntrySecondRead, endEntrySecondRead); } @@ -471,35 +388,26 @@ public void simpleConcurrentReadNoMatch() throws Exception { expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); - CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); - - CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, - CTX2); + final var future1 = pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount); + final var future2 = pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, + expectedReadCount); read1.storageReadCompleted(); - callback.get(); + final var entries1 = future1.get(); read2.storageReadCompleted(); - callback2.get(); - - assertSame(callback.getCtx(), CTX); - assertSame(callback2.getCtx(), CTX2); + final var entries2 = future2.get(); - verifyRange(callback.entries, firstEntry, endEntry); - verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + verifyRange(entries1, firstEntry, endEntry); + verifyRange(entries2, firstEntrySecondRead, endEntrySecondRead); } @Test public void concurrentReadOnOverlappedEntryRanges() throws Exception { - final var readFutures = new ArrayList(); - final BiConsumer readEntries = (firstEntry, lastEntry) -> { - final var callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, lastEntry, () -> 0, callback, CTX); - readFutures.add(callback); - }; + final var readFutures = new ArrayList>>(); + final BiConsumer readEntries = (firstEntry, lastEntry) -> + readFutures.add(pendingReadsManager.readEntries(lh, firstEntry, lastEntry, () -> 0)); final BiFunction mockReadFromStorage = (firstEntry, lastEntry) -> prepareReadFromStorage(lh, rangeEntryCache, firstEntry, lastEntry, () -> 0); @@ -512,15 +420,15 @@ public void concurrentReadOnOverlappedEntryRanges() throws Exception { read1.storageReadCompleted(); readFutures.get(1).get(1, TimeUnit.SECONDS); - assertEquals(readFutures.get(1).getEntries().size(), 21); + assertEquals(readFutures.get(1).get().size(), 21); read0.storageReadCompleted(); readFutures.get(0).get(1, TimeUnit.SECONDS); - assertEquals(readFutures.get(0).getEntries().size(), 61); + assertEquals(readFutures.get(0).get().size(), 61); read2.storageReadCompleted(); readFutures.get(2).get(1, TimeUnit.SECONDS); - assertEquals(readFutures.get(2).getEntries().size(), 91); + assertEquals(readFutures.get(2).get().size(), 91); log.info("entryRangeReadCount: {}", entryRangeReadCount); final var keys = Set.of(Pair.of(10L, 70L), Pair.of(71L, 79L), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index 2922a3d267982..3f23d0f95e465 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.EntryImpl; @@ -73,15 +72,12 @@ public void setup() { doAnswer(invocation -> { long firstEntry = invocation.getArgument(1); long lastEntry = invocation.getArgument(2); - AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(4); - Object ctx = invocation.getArgument(5); List entries = new ArrayList<>((int) (lastEntry - firstEntry + 1)); for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { entries.add(EntryImpl.create(1, entryId, Unpooled.EMPTY_BUFFER)); } - callback.readEntriesComplete(entries, ctx); - return null; - }).when(pendingReadsManager).readEntries(any(), anyLong(), anyLong(), any(), any(), any()); + return CompletableFuture.completedFuture(entries); + }).when(pendingReadsManager).readEntries(any(), anyLong(), anyLong(), any()); rangeEntryCache = new RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false, mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT, pendingReadsManager); @@ -95,8 +91,8 @@ public void testPartialCachingWithMiddleEntryInCache() { Entry entry = EntryImpl.create(1, 50, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(49L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(51L), eq(99L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(49L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(51L), eq(99L), any()); } @Test @@ -104,7 +100,7 @@ public void testPartialCachingWithFirstEntryInCache() { Entry entry = EntryImpl.create(1, 0, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(1L), eq(99L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(1L), eq(99L), any()); } @Test @@ -112,7 +108,7 @@ public void testPartialCachingWithLastEntryInCache() { Entry entry = EntryImpl.create(1, 99, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(98L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(98L), any()); } @Test @@ -122,8 +118,8 @@ public void testPartialCachingWithMiddleRangeInCache() { entry = EntryImpl.create(1, 51, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(49L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(52L), eq(99L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(49L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(52L), eq(99L), any()); } @Test @@ -133,7 +129,7 @@ public void testPartialCachingWithFirstRangeInCache() { entry = EntryImpl.create(1, 1, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(2L), eq(99L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(2L), eq(99L), any()); } @Test @@ -143,7 +139,7 @@ public void testPartialCachingWithLastRangeInCache() { EntryImpl.create(1, 99, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(97L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(97L), any()); } @Test @@ -159,11 +155,11 @@ public void testPartialCachingWithMultipleEntriesInCache() { entry = EntryImpl.create(1, 78, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(4L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(6L), eq(14L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(16L), eq(74L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(77L), eq(77L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(79L), eq(99L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(4L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(6L), eq(14L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(16L), eq(74L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(77L), eq(77L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(79L), eq(99L), any()); } @Test @@ -171,31 +167,17 @@ public void testPartialCachingWithMultipleEntriesInCacheWhilePartialReadFails() Entry entry = EntryImpl.create(1, 50, Unpooled.EMPTY_BUFFER); rangeEntryCache.insert(entry); doAnswer(invocation -> { - AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(4); - Object ctx = invocation.getArgument(5); System.out.println("Injecting test failure for readEntries"); - callback.readEntriesFailed(new ManagedLedgerException("Injected test failure"), ctx); - return null; - }).when(pendingReadsManager).readEntries(any(), eq(51L), eq(99L), any(), any(), any()); + return CompletableFuture.failedFuture(new ManagedLedgerException("Injected test failure")); + }).when(pendingReadsManager).readEntries(any(), eq(51L), eq(99L), any()); performReadAndValidateResult(); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(49L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(51L), eq(99L), any(), any(), any()); - verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(99L), any(), any(), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(49L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(51L), eq(99L), any()); + verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(99L), any()); } private void performReadAndValidateResult() { - CompletableFuture> future = new CompletableFuture<>(); - rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount, new AsyncCallbacks.ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - future.complete(entries); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); + final var future = rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount); assertThat(future).isCompleted().satisfies(f -> { List entries = f.getNow(null); assertThat(entries).hasSize(100); @@ -206,4 +188,4 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 0f496e461b85c..f8c5e2e21688e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -860,10 +860,9 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj ReadType readType = (ReadType) ctx; long waitTimeMillis = readFailureBackoff.next(); - // Do not keep reading more entries if the cursor is already closed. - if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) { + if (ManagedLedgerException.shouldNotRead(exception)) { if (log.isDebugEnabled()) { - log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName()); + log.debug("[{}] skipping read more entries due to {}", cursor.getName(), exception.getMessage()); } // Set the wait time to -1 to avoid rescheduling the read. waitTimeMillis = -1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index e809d984ae344..b1fa061530ff0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -35,7 +35,6 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; @@ -469,20 +468,13 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep Consumer c = readEntriesCtx.getConsumer(); readEntriesCtx.recycle(); - // Do not keep reading messages from a closed cursor. - if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) { + if (ManagedLedgerException.shouldNotRead(exception)) { if (log.isDebugEnabled()) { - log.debug("[{}] Cursor was already closed, skipping read more entries", cursor.getName()); + log.debug("[{}] skipping read more entries due to {}", cursor.getName(), exception.getMessage()); } return; } - if (exception instanceof ConcurrentWaitCallbackException) { - // At most one pending read request is allowed when there are no more entries, we should not trigger more - // read operations in this case and just wait the existing read operation completes. - return; - } - long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof NoMoreEntriesToReadException) {