Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public CursorAlreadyClosedException(String msg) {
}
}

public static class CancelledException extends ManagedLedgerException {
public CancelledException(String msg) {
super(msg);
}
}

public static class TooManyRequestsException extends ManagedLedgerException {
public TooManyRequestsException(String msg) {
super(msg);
Expand Down Expand Up @@ -211,4 +217,10 @@ public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
return null;
}

public static boolean shouldNotRead(ManagedLedgerException exception) {
return exception instanceof ConcurrentWaitCallbackException
|| exception instanceof ManagedLedgerException.CursorAlreadyClosedException
|| exception instanceof ManagedLedgerException.CancelledException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1076,8 +1076,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
ctx, maxPosition, skipCondition, true);
int opReadId = op.id;
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
op.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException());
return;
}

Expand Down Expand Up @@ -1188,7 +1187,8 @@ public boolean cancelPendingReadRequest() {
return null;
});
if (op != null) {
op.recycle();
final var msg = op.toString();
op.readEntriesFailed(new ManagedLedgerException.CancelledException(msg + " is cancelled"));
}
return op != null && op != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR;
}
Expand Down Expand Up @@ -2962,7 +2962,7 @@ protected void closeWaitingCursor() {
OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this,
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR);
if (opReadEntry != null && opReadEntry != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
opReadEntry.readEntriesFailed(new CursorAlreadyClosedException("Cursor is closing"), opReadEntry.ctx);
opReadEntry.readEntriesFailed(new CursorAlreadyClosedException("Cursor is closing"));
}
}

Expand Down Expand Up @@ -3532,7 +3532,7 @@ void notifyEntriesAvailable() {
log.debug("[{}] [{}] Cursor is already closed, ignoring notification", ledger.getName(), name);
}
opReadEntry.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException(
"Cursor was already closed"), opReadEntry.ctx);
"Cursor was already closed"));
return;
}
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -92,7 +90,6 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
Expand Down Expand Up @@ -322,18 +319,14 @@ public boolean isFenced() {
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;

private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
protected static final AtomicLongFieldUpdater<ManagedLedgerImpl> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "addOpCount");
private volatile long addOpCount = 0;

// last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper>
LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
private volatile PendingReadEntriesOp lastReadEntriesOp = null;
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, PendingReadEntriesOp>
LAST_READ_ENTRIES_OP = AtomicReferenceFieldUpdater
.newUpdater(ManagedLedgerImpl.class, PendingReadEntriesOp.class, "lastReadEntriesOp");

/**
* Queue of pending entries to be added to the managed ledger. Typically, entries are queued when a new ledger is.
Expand Down Expand Up @@ -2064,7 +2057,7 @@ void clearPendingAddEntries(ManagedLedgerException e) {
void asyncReadEntries(OpReadEntry opReadEntry) {
final State state = STATE_UPDATER.get(this);
if (state.isFenced() || state == State.Closed) {
opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx);
opReadEntry.readEntriesFailed(new ManagedLedgerFencedException());
return;
}

Expand Down Expand Up @@ -2105,8 +2098,7 @@ && isLedgerFullyAcked(ledgerId, ledgerInfo, opReadEntry.cursor)) {
-> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
opReadEntry.ctx);
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()));
return null;
});
}
Expand Down Expand Up @@ -2395,179 +2387,44 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry);
}

protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) {
mbean.addEntriesRead(1);
final var future = entryCache.asyncReadEntry(ledger, position);
future.whenComplete((entry, throwable) -> {
if (throwable == null) {
callback.readEntryComplete(entry, ctx);
} else {
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx);
}
});
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
position.getEntryId(), callback, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
lastReadEntriesOp = new PendingReadEntriesOp(position.getLedgerId(), position.getEntryId(), createdTime,
future);
}
}

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
Object ctx) {
protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry) {
IntSupplier expectedReadCount = opReadEntry.cursor::getNumberOfCursorsAtSamePositionOrBefore;
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, opReadEntry, ctx);
}
}

static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {

volatile ReadEntryCallback readEntryCallback;
volatile ReadEntriesCallback readEntriesCallback;
String name;
long ledgerId;
long entryId;
volatile long readOpCount = -1;
private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
volatile long createdTime = -1;
volatile Object cntx;

final Handle<ReadEntryCallbackWrapper> recyclerHandle;

private ReadEntryCallbackWrapper(Handle<ReadEntryCallbackWrapper> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback,
long readOpCount, long createdTime, Object ctx) {
ReadEntryCallbackWrapper readCallback = RECYCLER.get();
readCallback.name = name;
readCallback.ledgerId = ledgerId;
readCallback.entryId = entryId;
readCallback.readEntryCallback = callback;
readCallback.cntx = ctx;
readCallback.readOpCount = readOpCount;
readCallback.createdTime = createdTime;
return readCallback;
}

static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback,
long readOpCount, long createdTime, Object ctx) {
ReadEntryCallbackWrapper readCallback = RECYCLER.get();
readCallback.name = name;
readCallback.ledgerId = ledgerId;
readCallback.entryId = entryId;
readCallback.readEntriesCallback = callback;
readCallback.cntx = ctx;
readCallback.readOpCount = readOpCount;
readCallback.createdTime = createdTime;
return readCallback;
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
long reOpCount = reOpCount(ctx);
ReadEntryCallback callback = this.readEntryCallback;
Object cbCtx = this.cntx;
if (recycle(reOpCount)) {
callback.readEntryComplete(entry, cbCtx);
final var future = entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount);
future.whenComplete((entries, throwable) -> {
if (throwable == null) {
opReadEntry.readEntriesComplete(entries);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
entry.release();
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable));
}
});
if (config.getReadEntryTimeoutSeconds() > 0) {
long createdTime = System.nanoTime();
lastReadEntriesOp = new PendingReadEntriesOp(ledger.getId(), firstEntry, createdTime, future);
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
long reOpCount = reOpCount(ctx);
ReadEntryCallback callback = this.readEntryCallback;
Object cbCtx = this.cntx;
if (recycle(reOpCount)) {
callback.readEntryFailed(exception, cbCtx);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
}
}

@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
long reOpCount = reOpCount(ctx);
ReadEntriesCallback callback = this.readEntriesCallback;
Object cbCtx = this.cntx;
if (recycle(reOpCount)) {
callback.readEntriesComplete(returnedEntries, cbCtx);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
returnedEntries.forEach(Entry::release);
}
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
long reOpCount = reOpCount(ctx);
ReadEntriesCallback callback = this.readEntriesCallback;
Object cbCtx = this.cntx;
if (recycle(reOpCount)) {
callback.readEntriesFailed(exception, cbCtx);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
}
}

private long reOpCount(Object ctx) {
return (ctx instanceof Long) ? (long) ctx : -1;
}

public void readFailed(ManagedLedgerException exception, Object ctx) {
if (readEntryCallback != null) {
readEntryFailed(exception, ctx);
} else if (readEntriesCallback != null) {
readEntriesFailed(exception, ctx);
}
// It happens when timeout-thread and read-callback both recycles at the same time.
// this read-callback has already been recycled so, do nothing..
}

private boolean recycle(long readOpCount) {
if (readOpCount != -1
&& READ_OP_COUNT_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, readOpCount, -1)) {
createdTime = -1;
readEntryCallback = null;
readEntriesCallback = null;
ledgerId = -1;
entryId = -1;
name = null;
recyclerHandle.recycle(this);
return true;
}
return false;
}

private static final Recycler<ReadEntryCallbackWrapper> RECYCLER = new Recycler<ReadEntryCallbackWrapper>() {
@Override
protected ReadEntryCallbackWrapper newObject(Handle<ReadEntryCallbackWrapper> handle) {
return new ReadEntryCallbackWrapper(handle);
}
};

record PendingReadEntriesOp(long ledgerId, long entryId, long createdTime,
CompletableFuture<?> future) {
}

@Override
Expand Down Expand Up @@ -4549,15 +4406,14 @@ private void checkReadTimeout() {
if (timeoutSec < 1) {
return;
}
ReadEntryCallbackWrapper callback = this.lastReadCallback;
long readOpCount = callback != null ? callback.readOpCount : 0;
boolean timeout = callback != null && (TimeUnit.NANOSECONDS
.toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec);
if (readOpCount > 0 && timeout) {
log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, this.lastReadCallback.ledgerId,
this.lastReadCallback.entryId, timeoutSec);
callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
LAST_READ_CALLBACK_UPDATER.compareAndSet(this, callback, null);
final var lastReadEntriesOp = this.lastReadEntriesOp;
if (lastReadEntriesOp != null && TimeUnit.NANOSECONDS.toSeconds(
System.nanoTime() - lastReadEntriesOp.createdTime) >= timeoutSec) {
log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, lastReadEntriesOp.ledgerId,
lastReadEntriesOp.entryId, timeoutSec);
lastReadEntriesOp.future.completeExceptionally(createManagedLedgerException(
BKException.Code.TimeoutException));
LAST_READ_ENTRIES_OP.compareAndSet(this, lastReadEntriesOp, null);
}
}

Expand Down
Loading
Loading