Skip to content

Commit 07af143

Browse files
authored
[improve][test] Remove EntryCacheCreator from ManagedLedgerFactoryImpl (#24552)
1 parent 86b036b commit 07af143

4 files changed

Lines changed: 37 additions & 59 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
2222
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
2323
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
24-
import com.google.common.annotations.VisibleForTesting;
2524
import com.google.common.base.Predicates;
2625
import com.google.common.collect.BoundType;
2726
import com.google.common.collect.Maps;
@@ -48,7 +47,6 @@
4847
import java.util.concurrent.ScheduledExecutorService;
4948
import java.util.concurrent.ScheduledFuture;
5049
import java.util.concurrent.TimeUnit;
51-
import java.util.function.Function;
5250
import java.util.function.Supplier;
5351
import java.util.stream.Collectors;
5452
import lombok.Getter;
@@ -110,7 +108,6 @@
110108
import org.apache.pulsar.metadata.api.Stat;
111109
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
112110
import org.apache.pulsar.metadata.api.extended.SessionEvent;
113-
import org.jspecify.annotations.Nullable;
114111
import org.slf4j.Logger;
115112
import org.slf4j.LoggerFactory;
116113

@@ -173,7 +170,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi
173170
ManagedLedgerFactoryConfig config)
174171
throws Exception {
175172
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
176-
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(), null);
173+
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
177174
}
178175

179176
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
@@ -187,21 +184,12 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper
187184
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config);
188185
}
189186

190-
@VisibleForTesting
191-
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
192-
@Nullable Function<ManagedLedgerFactoryImpl, EntryCacheManager>
193-
entryCacheManagerCreator)
194-
throws Exception {
195-
this(metadataStore, __ -> CompletableFuture.completedFuture(bookKeeper), false,
196-
new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE, OpenTelemetry.noop(),
197-
entryCacheManagerCreator);
198-
}
199-
200187
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
201188
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
202189
ManagedLedgerFactoryConfig config)
203190
throws Exception {
204-
this(metadataStore, bookKeeperGroupFactory, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
191+
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
192+
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
205193
}
206194

207195
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -210,18 +198,15 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
210198
OpenTelemetry openTelemetry)
211199
throws Exception {
212200
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
213-
config, statsLogger, openTelemetry, null);
201+
config, statsLogger, openTelemetry);
214202
}
215203

216204
private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
217205
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
218206
boolean isBookkeeperManaged,
219207
ManagedLedgerFactoryConfig config,
220208
StatsLogger statsLogger,
221-
OpenTelemetry openTelemetry,
222-
@Nullable Function<ManagedLedgerFactoryImpl, EntryCacheManager>
223-
entryCacheManagerCreator)
224-
throws Exception {
209+
OpenTelemetry openTelemetry) throws Exception {
225210
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
226211
config.getCompressionConfigForManagedLedgerInfo();
227212
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -243,11 +228,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
243228
compressionConfigForManagedCursorInfo);
244229
this.config = config;
245230
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
246-
if (entryCacheManagerCreator == null) {
247-
this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry);
248-
} else {
249-
this.entryCacheManager = entryCacheManagerCreator.apply(this);
250-
}
231+
this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry);
251232
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
252233
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
253234
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,8 @@ private long getAvgEntrySize() {
440440
* @param shouldCacheEntry if we should put the entry into the cache
441441
* @return a handle to the operation
442442
*/
443-
@VisibleForTesting
444-
protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
445-
boolean shouldCacheEntry) {
443+
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
444+
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
446445
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
447446
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
448447
.thenApply(

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Collections;
4848
import java.util.HashMap;
4949
import java.util.HashSet;
50+
import java.util.Iterator;
5051
import java.util.List;
5152
import java.util.Map;
5253
import java.util.Optional;
@@ -79,6 +80,8 @@
7980
import org.apache.bookkeeper.client.BookKeeper.DigestType;
8081
import org.apache.bookkeeper.client.LedgerEntry;
8182
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
83+
import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
84+
import org.apache.bookkeeper.client.api.LedgerEntries;
8285
import org.apache.bookkeeper.client.api.ReadHandle;
8386
import org.apache.bookkeeper.common.util.OrderedExecutor;
8487
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -100,12 +103,11 @@
100103
import org.apache.bookkeeper.mledger.ScanOutcome;
101104
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
102105
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
103-
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
104-
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
105106
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
106107
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
107108
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
108109
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
110+
import org.apache.commons.collections.iterators.EmptyIterator;
109111
import org.apache.commons.lang3.mutable.MutableBoolean;
110112
import org.apache.pulsar.common.api.proto.CommandSubscribe;
111113
import org.apache.pulsar.common.api.proto.IntRange;
@@ -139,7 +141,7 @@ public static Object[][] useOpenRangeSet() {
139141

140142
@AfterMethod
141143
public void afterMethod() {
142-
setEntryCacheCreator(null);
144+
bkc.setReadHandleInterceptor(null);
143145
}
144146

145147
@Test
@@ -5413,16 +5415,32 @@ public void operationFailed(ManagedLedgerException exception) {
54135415
@Test(timeOut = 10000)
54145416
public void testReadNoEntries() throws Exception {
54155417
final var firstRead = new AtomicBoolean(true);
5416-
setEntryCacheCreator(ml -> new RangeEntryCacheImpl((RangeEntryCacheManagerImpl) factory.getEntryCacheManager(),
5417-
ml, factory.getConfig().isCopyEntriesInCache()) {
5418-
5418+
bkc.setReadHandleInterceptor(new PulsarMockReadHandleInterceptor() {
54195419
@Override
5420-
protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
5421-
boolean shouldCacheEntry) {
5420+
public CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry,
5421+
LedgerEntries entries) {
54225422
if (firstRead.compareAndSet(true, false)) {
5423-
return CompletableFuture.completedFuture(List.of());
5423+
// LedgerEntriesImpl doesn't allow empty entries list.
5424+
// Implementing a dummy LedgerEntries that returns an empty list.
5425+
return CompletableFuture.completedFuture(new LedgerEntries() {
5426+
@Override
5427+
public org.apache.bookkeeper.client.api.LedgerEntry getEntry(long entryId) {
5428+
return null;
5429+
}
5430+
5431+
@Override
5432+
public Iterator<org.apache.bookkeeper.client.api.LedgerEntry> iterator() {
5433+
return EmptyIterator.INSTANCE;
5434+
}
5435+
5436+
@Override
5437+
public void close() {
5438+
5439+
}
5440+
});
5441+
} else {
5442+
return CompletableFuture.completedFuture(entries);
54245443
}
5425-
return super.readFromStorage(lh, firstEntry, lastEntry, shouldCacheEntry);
54265444
}
54275445
});
54285446
final var ml = factory.open("testReadNoEntries");

managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,17 @@
1818
*/
1919
package org.apache.bookkeeper.test;
2020

21-
import io.opentelemetry.api.OpenTelemetry;
2221
import java.lang.reflect.Method;
2322
import java.util.Optional;
2423
import java.util.concurrent.ExecutorService;
2524
import java.util.concurrent.Executors;
2625
import java.util.concurrent.TimeUnit;
27-
import java.util.function.Function;
2826
import lombok.SneakyThrows;
2927
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
3028
import org.apache.bookkeeper.common.util.OrderedScheduler;
3129
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3230
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
3331
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
34-
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
35-
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
36-
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
3732
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
3833
import org.apache.pulsar.metadata.api.MetadataStoreException;
3934
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -62,7 +57,6 @@ public abstract class MockedBookKeeperTestCase {
6257
protected ExecutorService cachedExecutor;
6358

6459
protected FaultInjectionMetadataStore metadataStore;
65-
private Function<ManagedLedgerImpl, EntryCache> entryCacheCreator = null;
6660

6761
public MockedBookKeeperTestCase() {
6862
// By default start a 3 bookies cluster
@@ -90,17 +84,7 @@ public final void setUp(Method method) throws Exception {
9084

9185
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
9286
initManagedLedgerFactoryConfig(managedLedgerFactoryConfig);
93-
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, __ -> new RangeEntryCacheManagerImpl(__,
94-
__.getScheduledExecutor(), OpenTelemetry.noop()) {
95-
96-
@Override
97-
public EntryCache getEntryCache(ManagedLedgerImpl ml) {
98-
if (entryCacheCreator != null) {
99-
return entryCacheCreator.apply(ml);
100-
}
101-
return super.getEntryCache(ml);
102-
}
103-
});
87+
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
10488

10589
setUpTestCase();
10690
}
@@ -180,8 +164,4 @@ protected void stopBookKeeper() {
180164
protected void stopMetadataStore() {
181165
metadataStore.setAlwaysFail(new MetadataStoreException("failed"));
182166
}
183-
184-
protected void setEntryCacheCreator(Function<ManagedLedgerImpl, EntryCache> entryCacheCreator) {
185-
this.entryCacheCreator = entryCacheCreator;
186-
}
187167
}

0 commit comments

Comments
 (0)