Skip to content

Commit eea7c13

Browse files
[fix][ml] Fix asyncReadEntries might never complete if empty entries are read from BK (apache#24515)
1 parent 702c73c commit eea7c13

6 files changed

Lines changed: 168 additions & 37 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
2222
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
2323
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
24+
import com.google.common.annotations.VisibleForTesting;
2425
import com.google.common.base.Predicates;
2526
import com.google.common.collect.BoundType;
2627
import com.google.common.collect.Maps;
@@ -47,6 +48,7 @@
4748
import java.util.concurrent.ScheduledExecutorService;
4849
import java.util.concurrent.ScheduledFuture;
4950
import java.util.concurrent.TimeUnit;
51+
import java.util.function.Function;
5052
import java.util.function.Supplier;
5153
import java.util.stream.Collectors;
5254
import lombok.Getter;
@@ -108,6 +110,7 @@
108110
import org.apache.pulsar.metadata.api.Stat;
109111
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
110112
import org.apache.pulsar.metadata.api.extended.SessionEvent;
113+
import org.jspecify.annotations.Nullable;
111114
import org.slf4j.Logger;
112115
import org.slf4j.LoggerFactory;
113116

@@ -170,7 +173,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi
170173
ManagedLedgerFactoryConfig config)
171174
throws Exception {
172175
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
173-
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
176+
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(), null);
174177
}
175178

176179
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
@@ -184,12 +187,21 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper
184187
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config);
185188
}
186189

190+
@VisibleForTesting
191+
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
192+
@Nullable Function<ManagedLedgerFactoryImpl, EntryCacheManager>
193+
entryCacheManagerCreator)
194+
throws Exception {
195+
this(metadataStore, __ -> CompletableFuture.completedFuture(bookKeeper), false,
196+
new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE, OpenTelemetry.noop(),
197+
entryCacheManagerCreator);
198+
}
199+
187200
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
188201
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
189202
ManagedLedgerFactoryConfig config)
190203
throws Exception {
191-
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
192-
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
204+
this(metadataStore, bookKeeperGroupFactory, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
193205
}
194206

195207
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -198,15 +210,18 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
198210
OpenTelemetry openTelemetry)
199211
throws Exception {
200212
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
201-
config, statsLogger, openTelemetry);
213+
config, statsLogger, openTelemetry, null);
202214
}
203215

204216
private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
205217
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
206218
boolean isBookkeeperManaged,
207219
ManagedLedgerFactoryConfig config,
208220
StatsLogger statsLogger,
209-
OpenTelemetry openTelemetry) throws Exception {
221+
OpenTelemetry openTelemetry,
222+
@Nullable Function<ManagedLedgerFactoryImpl, EntryCacheManager>
223+
entryCacheManagerCreator)
224+
throws Exception {
210225
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
211226
config.getCompressionConfigForManagedLedgerInfo();
212227
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -228,7 +243,11 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
228243
compressionConfigForManagedCursorInfo);
229244
this.config = config;
230245
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
231-
this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry);
246+
if (entryCacheManagerCreator == null) {
247+
this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry);
248+
} else {
249+
this.entryCacheManager = entryCacheManagerCreator.apply(this);
250+
}
232251
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
233252
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
234253
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2245,8 +2245,9 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
22452245
// If all messages in [firstEntry...lastEntry] are filter out,
22462246
// then manual call internalReadEntriesComplete to advance read position.
22472247
if (firstValidEntry == -1L) {
2248-
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
2249-
PositionFactory.create(ledger.getId(), lastEntry));
2248+
final var nextReadPosition = PositionFactory.create(ledger.getId(), lastEntry).getNext();
2249+
opReadEntry.updateReadPosition(nextReadPosition);
2250+
opReadEntry.checkReadCompletion();
22502251
return;
22512252
}
22522253

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java

Lines changed: 78 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.util.Recycler;
2222
import io.netty.util.Recycler.Handle;
2323
import java.util.ArrayList;
24-
import java.util.Collections;
2524
import java.util.List;
2625
import java.util.function.Predicate;
2726
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -67,7 +66,12 @@ public static OpReadEntry create(ManagedCursorImpl cursor, Position readPosition
6766
return op;
6867
}
6968

70-
void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, Position lastPosition) {
69+
private void internalReadEntriesComplete(List<Entry> returnedEntries) {
70+
if (returnedEntries.isEmpty()) {
71+
log.warn("[{}] Read no entries unexpectedly", this);
72+
checkReadCompletion();
73+
return;
74+
}
7175
// Filter the returned entries for individual deleted messages
7276
int entriesCount = returnedEntries.size();
7377
long entriesSize = 0;
@@ -76,19 +80,15 @@ void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, Positi
7680
}
7781
cursor.updateReadStats(entriesCount, entriesSize);
7882

79-
if (entriesCount != 0) {
80-
lastPosition = returnedEntries.get(entriesCount - 1).getPosition();
81-
}
8283
if (log.isDebugEnabled()) {
8384
log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}",
8485
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count);
8586
}
8687

87-
List<Entry> filteredEntries = Collections.emptyList();
88-
if (entriesCount != 0) {
89-
filteredEntries = cursor.filterReadEntries(returnedEntries);
90-
entries.addAll(filteredEntries);
91-
}
88+
// Entries might be released after `filterReadEntries`, so retrieve the last position before that
89+
final var lastPosition = returnedEntries.get(entriesCount - 1).getPosition();
90+
final var filteredEntries = cursor.filterReadEntries(returnedEntries);
91+
entries.addAll(filteredEntries);
9292

9393
// if entries have been filtered out then try to skip reading of already deletedMessages in that range
9494
final Position nexReadPosition = entriesCount != filteredEntries.size()
@@ -99,19 +99,30 @@ void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, Positi
9999

100100
@Override
101101
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
102-
internalReadEntriesComplete(returnedEntries, ctx, null);
102+
try {
103+
internalReadEntriesComplete(returnedEntries);
104+
} catch (Throwable throwable) {
105+
log.error("[{}] Fallback to readEntriesFailed for exception in readEntriesComplete", this, throwable);
106+
readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx);
107+
}
103108
}
104109

105110
@Override
106111
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
112+
try {
113+
internalReadEntriesFailed(exception, ctx);
114+
} catch (Throwable throwable) {
115+
// At least we should complete the callback
116+
fail(ManagedLedgerException.getManagedLedgerException(throwable), ctx);
117+
}
118+
}
119+
120+
private void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
107121
cursor.readOperationCompleted();
108122

109123
if (!entries.isEmpty()) {
110124
// There were already some entries that were read before, we can return them
111-
cursor.ledger.getExecutor().execute(() -> {
112-
callback.readEntriesComplete(entries, ctx);
113-
recycle();
114-
});
125+
complete(ctx);
115126
} else if (cursor.getConfig().isAutoSkipNonRecoverableData()
116127
&& exception instanceof NonRecoverableLedgerException) {
117128
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
@@ -129,9 +140,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
129140
}
130141
// fail callback if it couldn't find next valid ledger
131142
if (nexReadPosition == null) {
132-
callback.readEntriesFailed(exception, ctx);
133-
cursor.ledger.mbean.recordReadEntriesError();
134-
recycle();
143+
fail(exception, ctx);
135144
return;
136145
}
137146
updateReadPosition(nexReadPosition);
@@ -152,9 +161,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
152161
}
153162
}
154163

155-
callback.readEntriesFailed(exception, ctx);
156-
cursor.ledger.mbean.recordReadEntriesError();
157-
recycle();
164+
fail(exception, ctx);
158165
}
159166
}
160167

@@ -177,12 +184,8 @@ void checkReadCompletion() {
177184
// The reading was already completed, release resources and trigger callback
178185
try {
179186
cursor.readOperationCompleted();
180-
181187
} finally {
182-
cursor.ledger.getExecutor().execute(() -> {
183-
callback.readEntriesComplete(entries, ctx);
184-
recycle();
185-
});
188+
complete(ctx);
186189
}
187190
}
188191
}
@@ -217,5 +220,54 @@ public void recycle() {
217220
recyclerHandle.recycle(this);
218221
}
219222

223+
private void complete(Object ctx) {
224+
cursor.ledger.getExecutor().execute(() -> {
225+
try {
226+
callback.readEntriesComplete(entries, ctx);
227+
recycle();
228+
} catch (Throwable throwable) {
229+
log.error("[{}] readEntriesComplete failed (last position: {})", this, lastEntryPosition(), throwable);
230+
}
231+
});
232+
}
233+
234+
private void fail(ManagedLedgerException e, Object ctx) {
235+
try {
236+
callback.readEntriesFailed(e, ctx);
237+
cursor.ledger.mbean.recordReadEntriesError();
238+
recycle();
239+
} catch (Throwable throwable) {
240+
log.error("[{}] readEntriesFailed failed (exception: {})", this, e.getMessage(), throwable);
241+
}
242+
}
243+
244+
@Override
245+
public String toString() {
246+
final var cursor = this.cursor;
247+
final var readPosition = this.readPosition;
248+
final var nextReadPosition = this.nextReadPosition;
249+
final var entries = this.entries;
250+
final var maxPosition = this.maxPosition;
251+
final var count = this.count;
252+
if (cursor != null) {
253+
return cursor.ledger.getName() + " " + cursor.getName() + "{ readPosition: "
254+
+ (readPosition != null ? readPosition : "(null)") + ", nextReadPosition: "
255+
+ (nextReadPosition != null ? nextReadPosition : "(null)") + ", maxPosition: "
256+
+ (maxPosition != null ? maxPosition : "(null)") + ", entries count: "
257+
+ (entries != null ? entries.size() : "(null)") + ", count: " + count + " }";
258+
} else {
259+
return "(null)";
260+
}
261+
}
262+
263+
private String lastEntryPosition() {
264+
final var entries = this.entries;
265+
if (entries != null) {
266+
return entries.isEmpty() ? "(empty)" : entries.get(entries.size() - 1).getPosition().toString();
267+
} else {
268+
return "(null)";
269+
}
270+
}
271+
220272
private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
221273
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,9 @@ private long getAvgEntrySize() {
440440
* @param shouldCacheEntry if we should put the entry into the cache
441441
* @return a handle to the operation
442442
*/
443-
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
444-
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
443+
@VisibleForTesting
444+
protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
445+
boolean shouldCacheEntry) {
445446
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
446447
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
447448
.thenApply(

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.function.Predicate;
7272
import java.util.function.Supplier;
7373
import java.util.stream.Collectors;
74+
import java.util.stream.IntStream;
7475
import lombok.Cleanup;
7576
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
7677
import org.apache.bookkeeper.client.BKException;
@@ -99,6 +100,8 @@
99100
import org.apache.bookkeeper.mledger.ScanOutcome;
100101
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
101102
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
103+
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
104+
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
102105
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
103106
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
104107
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
@@ -121,6 +124,7 @@
121124
import org.slf4j.Logger;
122125
import org.slf4j.LoggerFactory;
123126
import org.testng.Assert;
127+
import org.testng.annotations.AfterMethod;
124128
import org.testng.annotations.DataProvider;
125129
import org.testng.annotations.Test;
126130

@@ -133,6 +137,10 @@ public static Object[][] useOpenRangeSet() {
133137
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
134138
}
135139

140+
@AfterMethod
141+
public void afterMethod() {
142+
setEntryCacheCreator(null);
143+
}
136144

137145
@Test
138146
public void testCloseCursor() throws Exception {
@@ -5402,6 +5410,36 @@ public void operationFailed(ManagedLedgerException exception) {
54025410
assertTrue(recovered.booleanValue());
54035411
}
54045412

5413+
@Test(timeOut = 10000)
5414+
public void testReadNoEntries() throws Exception {
5415+
final var firstRead = new AtomicBoolean(true);
5416+
setEntryCacheCreator(ml -> new RangeEntryCacheImpl((RangeEntryCacheManagerImpl) factory.getEntryCacheManager(),
5417+
ml, factory.getConfig().isCopyEntriesInCache()) {
5418+
5419+
@Override
5420+
protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
5421+
boolean shouldCacheEntry) {
5422+
if (firstRead.compareAndSet(true, false)) {
5423+
return CompletableFuture.completedFuture(List.of());
5424+
}
5425+
return super.readFromStorage(lh, firstEntry, lastEntry, shouldCacheEntry);
5426+
}
5427+
});
5428+
final var ml = factory.open("testReadNoEntries");
5429+
final var cursor = ml.openCursor("cursor");
5430+
cursor.setInactive(); // disable caching when adding entries
5431+
for (int i = 0; i < 10; i++) {
5432+
ml.addEntry(("msg-" + i).getBytes(StandardCharsets.UTF_8));
5433+
}
5434+
final var entries = cursor.readEntries(10);
5435+
assertEquals(entries.stream().map(e -> {
5436+
final var buffer = e.getDataBuffer();
5437+
final var bytes = new byte[buffer.readableBytes()];
5438+
buffer.readBytes(bytes);
5439+
return new String(bytes, StandardCharsets.UTF_8);
5440+
}).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + i).toList());
5441+
}
5442+
54055443
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
54065444
Map<Long, Integer> ledgerErrors = new HashMap<>();
54075445

0 commit comments

Comments
 (0)