Skip to content

Commit 5c5b44b

Browse files
[revert] "[improve][ml] Do not switch thread to execute asyncAddEntry's core logic (#23940)" (#23994)
1 parent 40b96de commit 5c5b44b

3 files changed

Lines changed: 31 additions & 90 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -802,41 +802,33 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
802802
buffer.retain();
803803

804804
// Jump to specific thread to avoid contention from writers writing from different threads
805-
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
806-
currentLedgerTimeoutTriggered);
807-
var added = false;
808-
try {
809-
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
810-
// element in `pendingAddEntries`.
811-
synchronized (this) {
812-
if (managedLedgerInterceptor != null) {
813-
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
814-
}
815-
final var state = STATE_UPDATER.get(this);
816-
beforeAddEntryToQueue(state);
817-
pendingAddEntries.add(addOperation);
818-
added = true;
819-
afterAddEntryToQueue(state, addOperation);
820-
}
821-
} catch (Throwable throwable) {
822-
if (!added) {
823-
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
824-
} // else: all elements of `pendingAddEntries` will fail in another thread
825-
}
805+
executor.execute(() -> {
806+
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
807+
currentLedgerTimeoutTriggered);
808+
internalAsyncAddEntry(addOperation);
809+
});
826810
}
827811

828-
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
829-
if (state.isFenced()) {
830-
throw new ManagedLedgerFencedException();
812+
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
813+
if (!beforeAddEntry(addOperation)) {
814+
return;
831815
}
832-
switch (state) {
833-
case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated");
834-
case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
835-
case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
816+
final State state = STATE_UPDATER.get(this);
817+
if (state.isFenced()) {
818+
addOperation.failed(new ManagedLedgerFencedException());
819+
return;
820+
} else if (state == State.Terminated) {
821+
addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
822+
return;
823+
} else if (state == State.Closed) {
824+
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
825+
return;
826+
} else if (state == State.WriteFailed) {
827+
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
828+
return;
836829
}
837-
}
830+
pendingAddEntries.add(addOperation);
838831

839-
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
840832
if (state == State.ClosingLedger || state == State.CreatingLedger) {
841833
// We don't have a ready ledger to write into
842834
// We are waiting for a new ledger to be created

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,23 +223,25 @@ private void initLastConfirmedEntry() {
223223
}
224224

225225
@Override
226-
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
226+
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
227+
if (!beforeAddEntry(addOperation)) {
228+
return;
229+
}
227230
if (state != State.LedgerOpened) {
228-
throw new ManagedLedgerException("Managed ledger is not opened");
231+
addOperation.failed(new ManagedLedgerException("Managed ledger is not opened"));
232+
return;
229233
}
230-
}
231234

232-
@Override
233-
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
234235
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
235-
pendingAddEntries.poll();
236-
throw new ManagedLedgerException("Illegal addOperation context object.");
236+
addOperation.failed(new ManagedLedgerException("Illegal addOperation context object."));
237+
return;
237238
}
238239

239240
if (log.isDebugEnabled()) {
240241
log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})",
241242
name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId());
242243
}
244+
pendingAddEntries.add(addOperation);
243245
if (position.getLedgerId() <= currentLedger.getId()) {
244246
// Write into lastLedger
245247
if (position.getLedgerId() == currentLedger.getId()) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertNotEquals;
2323
import static org.testng.Assert.assertNotNull;
24-
import static org.testng.Assert.assertTrue;
2524

2625
import io.netty.buffer.ByteBuf;
2726
import io.netty.buffer.Unpooled;
@@ -30,12 +29,9 @@
3029
import java.util.Collections;
3130
import java.util.HashSet;
3231
import java.util.List;
33-
import java.util.Random;
3432
import java.util.Set;
3533
import java.util.concurrent.CompletableFuture;
3634
import java.util.concurrent.CountDownLatch;
37-
import java.util.concurrent.Executors;
38-
import java.util.concurrent.TimeUnit;
3935
import java.util.concurrent.atomic.AtomicInteger;
4036
import java.util.function.Predicate;
4137
import lombok.Cleanup;
@@ -503,53 +499,4 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
503499
ledger.close();
504500
}
505501

506-
@Test
507-
public void testBeforeAddEntry() throws Exception {
508-
final var interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
509-
final var config = new ManagedLedgerConfig();
510-
final var numEntries = 100;
511-
config.setMaxEntriesPerLedger(numEntries);
512-
config.setManagedLedgerInterceptor(interceptor);
513-
@Cleanup final var ml = (ManagedLedgerImpl) factory.open("test_concurrent_add_entry", config);
514-
515-
final var indexesBeforeAdd = new ArrayList<Long>();
516-
final var batchSizes = new ArrayList<Long>();
517-
final var random = new Random();
518-
final var latch = new CountDownLatch(numEntries);
519-
final var executor = Executors.newFixedThreadPool(3);
520-
final var lock = new Object(); // make sure `asyncAddEntry` are called in order
521-
for (int i = 0; i < numEntries; i++) {
522-
final var batchSize = random.nextInt(0, 100);
523-
final var msg = "msg-" + i;
524-
final var callback = new AsyncCallbacks.AddEntryCallback() {
525-
526-
@Override
527-
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
528-
latch.countDown();
529-
}
530-
531-
@Override
532-
public void addFailed(ManagedLedgerException exception, Object ctx) {
533-
log.error("Failed to add {}", msg, exception);
534-
latch.countDown();
535-
}
536-
};
537-
executor.execute(() -> {
538-
synchronized (lock) {
539-
batchSizes.add((long) batchSize);
540-
indexesBeforeAdd.add(interceptor.getIndex() + 1); // index is updated in each asyncAddEntry call
541-
ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()), batchSize, callback, null);
542-
}
543-
});
544-
}
545-
assertTrue(latch.await(3, TimeUnit.SECONDS));
546-
synchronized (lock) {
547-
for (int i = 1; i < numEntries; i++) {
548-
final var sum = batchSizes.get(i) + batchSizes.get(i - 1);
549-
batchSizes.set(i, sum);
550-
}
551-
assertEquals(indexesBeforeAdd.subList(1, numEntries), batchSizes.subList(0, numEntries - 1));
552-
}
553-
executor.shutdown();
554-
}
555502
}

0 commit comments

Comments
 (0)