Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,14 @@ public void run() {
updateLatency();
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
final var interceptor = ml.getManagedLedgerInterceptor();
if (interceptor != null) {
try {
interceptor.afterAddEntry(lastEntry, data, ctx);
} catch (Throwable throwable) {
log.error("Failed afterAddEntry on {}", lastEntry, throwable);
}
}
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;

/**
* Interceptor for ManagedLedger.
Expand Down Expand Up @@ -61,6 +62,12 @@ interface AddEntryOperation {
*/
void beforeAddEntry(AddEntryOperation op, int numberOfMessages);

/**
* Called before the `asyncAddEntry` callback is called.
*/
default void afterAddEntry(Position position, ByteBuf data, Object ctx) {
}

/**
* Intercept When add entry failed.
* @param numberOfMessages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -45,6 +46,8 @@
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.nio.ReadOnlyBufferException;
Expand Down Expand Up @@ -130,6 +133,7 @@
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
Expand All @@ -139,6 +143,7 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -4593,4 +4598,91 @@ public void testRemoveLedgerProperty() throws Exception {
Assert.assertEquals(ml.getLedgersInfo().get(firstLedger).getPropertiesCount(), 0);
Assert.assertEquals(ml.getLedgersInfo().get(lastLedger).getPropertiesCount(), 0);
}

@Test
public void testInterceptor() throws Exception {
final var interceptor = new ManagedLedgerInterceptorImpl();
final var config = new ManagedLedgerConfig();
config.setManagedLedgerInterceptor(interceptor);
final var ml = (ManagedLedgerImpl) factory.open("test-interceptor", config);

final var context = new AddEntryContext();
final var future = bkc.promiseAfter(2);
CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS).execute(() ->
future.completeExceptionally(new BKException.BKNotEnoughBookiesException()));
for (int i = 0; i < 5; i++) {
bkc.addEntryDelay(20, TimeUnit.MILLISECONDS);
}

for (int i = 0; i < 5; i++) {
final var value = "msg-" + i;
ml.asyncAddEntry(value.getBytes(), new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
log.info("complete {} with {}", value, position);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.info("fail {} with {}", value, exception.getMessage());
}
}, context);
}
Thread.sleep(1000);
}

static class AddEntryContext {

long endOffset = 0;
}

static class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {

@Override
public void beforeAddEntry(AddEntryOperation op, int numberOfMessages) {
final var context = (AddEntryContext) op.getCtx();
long nextOffset = context.endOffset;
final var compositeBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
final var offsetBuffer = Unpooled.buffer(8);
offsetBuffer.writeLong(nextOffset);
compositeBuf.addComponents(offsetBuffer);
compositeBuf.addComponents(op.getData());
op.setData(compositeBuf);

context.endOffset = nextOffset + 1;
log.info("Updated endOffset to {}", context.endOffset);
}


@Override
public void afterAddEntry(Position position, ByteBuf data, Object ctx) {
final var context = (AddEntryContext) ctx;
final var buffer = (CompositeByteBuf) data;
final var valueBuffer = ((CompositeByteBuf) data).component(1);
final var valueBytes = new byte[valueBuffer.readableBytes()];
valueBuffer.readBytes(valueBytes);
final var offset = buffer.component(0).readLong();
final var value = new String(valueBytes, UTF_8);
log.info("Sent {} to {} (endOffset: {})", value, offset, context.endOffset);
}

@Override
public void afterFailedAddEntry(int numberOfMessages) {
// TODO: use context instead
}

@Override
public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
}

@Override
public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name,
LastEntryHandle lastEntryHandle) {
return CompletableFuture.completedFuture(null);
}

@Override
public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
}
}
}
Loading
Loading