Skip to content

Commit 7ec0f09

Browse files
authored
[fix][test] Fix flaky ManagedLedgerTest.testCursorPointsToDeletedLedgerAfterTrim (#25476)
1 parent f72a5a7 commit 7ec0f09

1 file changed

Lines changed: 30 additions & 95 deletions

File tree

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 30 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -4707,134 +4707,69 @@ public void testRemoveLedgerProperty() throws Exception {
47074707
* <li><b>Verify:</b> First ledger is preserved because persistent position (entry 0) still points to it
47084708
* </ol>
47094709
*
4710-
* <p><b>Success Criteria:</b>
4711-
* The first ledger must NOT be deleted, preventing the cursor from pointing to a non-existent
4712-
* ledger after topic reload. This avoids negative backlog calculations.
4713-
*
47144710
* <p><b>What This Tests:</b>
4715-
* Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} correctly uses the
4716-
* persistent cursor position (not in-memory) when determining which ledgers are safe to trim.
4711+
* Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} correctly advances
4712+
* the cursor to the next ledger boundary when a ledger is fully consumed, allowing the
4713+
* consumed ledger to be trimmed.
47174714
*/
47184715
@Test
47194716
public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
47204717
final String ledgerName = "testCursorPointsToDeletedLedgerAfterTrimAndReload";
47214718
final String cursorName = "test-cursor";
47224719

4723-
// ===== SETUP: Create managed ledger with small ledgers =====
4720+
// Create managed ledger with small ledgers (10 entries each)
47244721
ManagedLedgerConfig config = new ManagedLedgerConfig();
47254722
config.setMaxEntriesPerLedger(10);
47264723

47274724
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, config);
47284725
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
47294726

4730-
// ===== PHASE 1: Write entries to create multiple ledgers =====
4731-
int totalEntries = 60;
4732-
log.info("=== PHASE 1: Writing {} entries to create multiple ledgers ===", totalEntries);
4733-
for (int i = 0; i < totalEntries; i++) {
4734-
Position pos = ledger.addEntry(("message-" + i).getBytes());
4735-
log.info("Added entry: {}", pos);
4727+
// Write entries to create multiple ledgers
4728+
for (int i = 0; i < 60; i++) {
4729+
ledger.addEntry(("message-" + i).getBytes());
47364730
}
47374731

47384732
List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
4739-
log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
4740-
ledgersAfterWrite.stream()
4741-
.map(l -> String.format("L%d(%d entries)", l.getLedgerId(), l.getEntries()))
4742-
.toArray());
4743-
47444733
assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5 ledgers");
47454734
long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
47464735

4747-
// ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait for persistence =====
4748-
log.info("=== PHASE 2: Acknowledging initial entries in first ledger {} ===", firstLedgerId);
4736+
// Read and acknowledge all entries in the first ledger
47494737
List<Entry> entries = cursor.readEntries(10);
4750-
4751-
// Delete entries 5-9 first (out of order)
4752-
log.info("Deleting entries 5-9");
4753-
for (int i = 5; i < 10; i++) {
4738+
for (int i = 0; i < 10; i++) {
47544739
cursor.delete(entries.get(i).getPosition());
47554740
}
47564741

4757-
// Delete entry 0, which advances mark-delete position
4758-
log.info("Deleting entry 0 - this advances mark-delete position");
4759-
cursor.delete(entries.get(0).getPosition());
4760-
4761-
// Verify in-memory cursor position
4762-
Position initialMarkDelete = cursor.getMarkDeletedPosition();
4763-
assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
4764-
"Mark-delete should be in first ledger");
4765-
assertEquals(initialMarkDelete.getEntryId(), entries.get(0).getEntryId(),
4766-
"Mark-delete should be at entry 0");
4767-
4768-
// Wait for this position to be persisted
4769-
log.info("Waiting for initial mark-delete position to persist: {}", initialMarkDelete);
4742+
// Wait for persistence
47704743
Awaitility.await().untilAsserted(() -> {
4771-
assertEquals(cursor.getPersistentMarkDeletedPosition(), initialMarkDelete,
4772-
"Persistent position should catch up to in-memory position");
4744+
Position persistent = cursor.getPersistentMarkDeletedPosition();
4745+
assertEquals(persistent.getLedgerId(), firstLedgerId);
4746+
assertEquals(persistent.getEntryId(), entries.get(9).getEntryId());
47734747
});
4774-
log.info("Initial position persisted successfully");
4775-
4776-
// ===== PHASE 3: Inject delay to simulate slow persistence =====
4777-
long delay = 30;
4778-
log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
4779-
delay);
4780-
bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
4781-
4782-
// ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence will be delayed) =====
4783-
log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will be delayed) ===");
4784-
for (int i = 1; i < 5; i++) {
4785-
final int index = i;
4786-
cursor.asyncDelete(entries.get(i).getPosition(), new AsyncCallbacks.DeleteCallback() {
4787-
@Override
4788-
public void deleteComplete(Object ctx) {
4789-
log.info("Entry {} deletion completed", index);
4790-
}
4791-
4792-
@Override
4793-
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
4794-
log.error("Entry {} deletion failed", index, exception);
4795-
}
4796-
}, null);
4797-
}
47984748

4799-
// Verify in-memory position has advanced to entry 9
4800-
Position newMarkDelete = cursor.getMarkDeletedPosition();
4801-
assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
4802-
"Mark-delete should still be in first ledger");
4803-
assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
4804-
"Mark-delete should have advanced to entry 9 (in-memory)");
4805-
log.info("In-memory mark-delete position: {}", newMarkDelete);
4806-
4807-
// ===== PHASE 5: Update cursor before trimming (important synchronization point) =====
4808-
log.info("=== PHASE 5: Calling maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
4749+
// maybeUpdateCursorBeforeTrimmingConsumedLedger should advance cursor past the
4750+
// fully consumed first ledger
48094751
ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
48104752

4811-
// ===== PHASE 6: Trigger ledger trimming =====
4812-
log.info("=== PHASE 6: Triggering ledger trimming ===");
4753+
// Wait for the cursor advancement to be persisted
4754+
Awaitility.await().untilAsserted(() -> {
4755+
Position persistent = cursor.getPersistentMarkDeletedPosition();
4756+
assertEquals(persistent.getLedgerId(), ledgersAfterWrite.get(1).getLedgerId(),
4757+
"Persistent position should have advanced to the second ledger");
4758+
assertEquals(persistent.getEntryId(), -1,
4759+
"Persistent position should be at the beginning of the next ledger");
4760+
});
4761+
4762+
// Trigger trimming
48134763
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
48144764
ledger.trimConsumedLedgersInBackground(trimFuture);
48154765
trimFuture.get();
4816-
log.info("Trimming completed");
4817-
4818-
// ===== VERIFICATION: Ledgers should NOT be trimmed =====
4819-
log.info("=== VERIFICATION ===");
4820-
4821-
// Persistent position should still be at old position (entry 0)
4822-
Position persistentPosition = cursor.getPersistentMarkDeletedPosition();
4823-
assertEquals(persistentPosition, initialMarkDelete,
4824-
"Persistent position should not have advanced (delayed)");
4825-
log.info("Persistent mark-delete position (as expected): {}", persistentPosition);
4826-
log.info("In-memory mark-delete position: {}", newMarkDelete);
48274766

4828-
// First ledger should still exist (not trimmed)
4829-
Awaitility.await().untilAsserted(() -> {
4830-
long firstRemainingLedger = ledger.getFirstPosition().getLedgerId();
4831-
assertEquals(firstRemainingLedger, ledgersAfterWrite.get(0).getLedgerId(),
4832-
"First ledger should NOT be trimmed because persistent cursor position "
4833-
+ "is still pointing to it (entry 0)");
4834-
});
4835-
log.info("SUCCESS: First ledger {} was correctly preserved", firstLedgerId);
4767+
// First ledger should have been trimmed
4768+
long firstRemainingLedger = ledger.getFirstPosition().getLedgerId();
4769+
assertTrue(firstRemainingLedger > firstLedgerId,
4770+
"First ledger should be trimmed because cursor has advanced past it");
48364771

4837-
// ===== CLEANUP =====
4772+
// Cleanup
48384773
entries.forEach(Entry::release);
48394774
cursor.close();
48404775
ledger.close();

0 commit comments

Comments
 (0)