Skip to content

Commit 0621ae6

Browse files
Only stop Gc for the fill disk for DbLedgerStorage (#4661)
* Only stop Gc for the fill disk for DbLedgerStorage
1 parent 5870922 commit 0621ae6

3 files changed

Lines changed: 171 additions & 6 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ boolean isFlushRequired() {
449449
}
450450

451451
@VisibleForTesting
452-
List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
452+
public List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
453453
return ledgerStorageList;
454454
}
455455

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.EnumSet;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.Objects;
4243
import java.util.PrimitiveIterator.OfLong;
4344
import java.util.concurrent.CopyOnWriteArrayList;
4445
import java.util.concurrent.ExecutorService;
@@ -149,6 +150,8 @@ protected Thread newThread(Runnable r, String name) {
149150

150151
private final Counter flushExecutorTime;
151152
private final boolean singleLedgerDirs;
153+
private final String ledgerBaseDir;
154+
private final String indexBaseDir;
152155

153156
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
154157
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
@@ -158,8 +161,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
158161
throws IOException {
159162
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
160163
"Db implementation only allows for one storage dir");
161-
162-
String ledgerBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).getPath();
164+
ledgerBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).getPath();
163165
// indexBaseDir default use ledgerBaseDir
164166
String indexBaseDir = ledgerBaseDir;
165167
if (CollectionUtils.isEmpty(indexDirsManager.getAllLedgerDirs())
@@ -172,6 +174,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
172174
log.info("indexDir is specified a separate dir, creating single directory db ledger storage on {}",
173175
indexBaseDir);
174176
}
177+
this.indexBaseDir = indexBaseDir;
175178

176179
StatsLogger ledgerIndexDirStatsLogger = statsLogger
177180
.scopeLabel("ledgerDir", ledgerBaseDir)
@@ -228,9 +231,9 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
228231
flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
229232
});
230233

231-
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
234+
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener(ledgerBaseDir));
232235
if (!ledgerBaseDir.equals(indexBaseDir)) {
233-
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener());
236+
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener(indexBaseDir));
234237
}
235238
}
236239

@@ -1151,11 +1154,19 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
11511154
"getListOfEntriesOfLedger method is currently unsupported for SingleDirectoryDbLedgerStorage");
11521155
}
11531156

1154-
private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() {
1157+
private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener(String diskPath) {
11551158
return new LedgerDirsListener() {
1159+
private final String currentFilePath = diskPath;
1160+
1161+
private boolean isCurrentFile(File disk) {
1162+
return Objects.equals(disk.getPath(), currentFilePath);
1163+
}
11561164

11571165
@Override
11581166
public void diskAlmostFull(File disk) {
1167+
if (!isCurrentFile(disk)) {
1168+
return;
1169+
}
11591170
if (gcThread.isForceGCAllowWhenNoSpace()) {
11601171
gcThread.enableForceGC();
11611172
} else {
@@ -1165,6 +1176,9 @@ public void diskAlmostFull(File disk) {
11651176

11661177
@Override
11671178
public void diskFull(File disk) {
1179+
if (!isCurrentFile(disk)) {
1180+
return;
1181+
}
11681182
if (gcThread.isForceGCAllowWhenNoSpace()) {
11691183
gcThread.enableForceGC();
11701184
} else {
@@ -1185,6 +1199,9 @@ public void allDisksFull(boolean highPriorityWritesAllowed) {
11851199

11861200
@Override
11871201
public void diskWritable(File disk) {
1202+
if (!isCurrentFile(disk)) {
1203+
return;
1204+
}
11881205
// we have enough space now
11891206
if (gcThread.isForceGCAllowWhenNoSpace()) {
11901207
// disable force gc.
@@ -1198,6 +1215,9 @@ public void diskWritable(File disk) {
11981215

11991216
@Override
12001217
public void diskJustWritable(File disk) {
1218+
if (!isCurrentFile(disk)) {
1219+
return;
1220+
}
12011221
if (gcThread.isForceGCAllowWhenNoSpace()) {
12021222
// if a disk is just writable, we still need force gc.
12031223
gcThread.enableForceGC();
@@ -1300,4 +1320,14 @@ public void clearStorageStateFlag(StorageState flag) throws IOException {
13001320
DbLedgerStorageStats getDbLedgerStorageStats() {
13011321
return dbLedgerStorageStats;
13021322
}
1323+
1324+
@VisibleForTesting
1325+
public String getLedgerBaseDir() {
1326+
return ledgerBaseDir;
1327+
}
1328+
1329+
@VisibleForTesting
1330+
public String getIndexBaseDir() {
1331+
return indexBaseDir;
1332+
}
13031333
}

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,28 @@
2121

2222
package org.apache.bookkeeper.bookie;
2323

24+
import static org.junit.Assert.assertEquals;
2425
import static org.junit.Assert.assertFalse;
2526
import static org.junit.Assert.assertTrue;
2627
import static org.junit.Assert.fail;
2728

2829
import java.io.File;
2930
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Objects;
34+
import java.util.concurrent.ConcurrentHashMap;
3035
import java.util.concurrent.CountDownLatch;
3136
import java.util.concurrent.TimeUnit;
3237
import java.util.concurrent.atomic.AtomicBoolean;
3338
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
3439
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
40+
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
3541
import org.apache.bookkeeper.client.BookKeeper.DigestType;
3642
import org.apache.bookkeeper.client.LedgerHandle;
3743
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
3844
import org.apache.bookkeeper.conf.ServerConfiguration;
45+
import org.apache.bookkeeper.conf.TestBKConfiguration;
3946
import org.apache.bookkeeper.proto.BookieServer;
4047
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
4148
import org.apache.bookkeeper.util.DiskChecker;
@@ -250,4 +257,132 @@ public void diskFull(File disk) {
250257
Thread.sleep(500);
251258
assertFalse("Bookie should be transitioned to ReadWrite", bookie.isReadOnly());
252259
}
260+
261+
@org.junit.Test
262+
public void testStopGCOnCorrespondingDiskWhenDiskFull() throws Exception {
263+
// 1. Create test directories
264+
File ledgerDir1 = tmpDirs.createNew("ledger", "test1");
265+
File ledgerDir2 = tmpDirs.createNew("ledger", "test2");
266+
267+
// 2. Configure Bookie
268+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
269+
conf.setGcWaitTime(1000);
270+
conf.setLedgerDirNames(new String[] { ledgerDir1.getPath(), ledgerDir2.getPath() });
271+
conf.setDiskCheckInterval(100); // Shorten disk check interval
272+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
273+
274+
// 3. Start Bookie and obtain internal components
275+
Bookie bookie = new TestBookieImpl(conf);
276+
BookieImpl bookieImpl = (BookieImpl) bookie;
277+
LedgerDirsManager ledgerDirsManager = bookieImpl.getLedgerDirsManager();
278+
279+
// 4. Create custom disk checker (key step)
280+
GCTestDiskChecker diskChecker = new GCTestDiskChecker(
281+
conf.getDiskUsageThreshold(),
282+
conf.getDiskUsageWarnThreshold()
283+
);
284+
// Set directory status: dir1 full (100%), dir2 normal (50%)
285+
File[] currentDirectories = BookieImpl.getCurrentDirectories(new File[] { ledgerDir1, ledgerDir2 });
286+
diskChecker.setUsageMap(currentDirectories[0], 1.0f); // 100% usage
287+
diskChecker.setUsageMap(currentDirectories[1], 0.5f); // 50% usage
288+
289+
// 5. Replace Bookie's disk checker
290+
bookieImpl.dirsMonitor.shutdown(); // Stop default monitor
291+
bookieImpl.dirsMonitor = new LedgerDirsMonitor(
292+
conf,
293+
diskChecker,
294+
Collections.singletonList(ledgerDirsManager)
295+
);
296+
bookieImpl.dirsMonitor.start();
297+
298+
// 6. Add disk state listener
299+
CountDownLatch dir1Full = new CountDownLatch(1);
300+
CountDownLatch dir1Writable = new CountDownLatch(1);
301+
302+
ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
303+
@Override
304+
public void diskFull(File disk) {
305+
if (disk.equals(currentDirectories[0])) {
306+
dir1Full.countDown();
307+
}
308+
}
309+
310+
@Override
311+
public void diskWritable(File disk) {
312+
if (disk.equals(currentDirectories[0])) {
313+
dir1Writable.countDown();
314+
}
315+
}
316+
});
317+
318+
// 7. Wait for state update (ensure event is triggered)
319+
assertTrue("dir1 did not trigger full state", dir1Full.await(30, TimeUnit.SECONDS));
320+
321+
// 8. Verify directory status
322+
List<File> fullDirs = ledgerDirsManager.getFullFilledLedgerDirs();
323+
List<File> writableDirs = ledgerDirsManager.getWritableLedgerDirs();
324+
325+
assertTrue("dir1 should be marked as full", fullDirs.contains(currentDirectories[0]));
326+
assertTrue("dir2 should remain writable", writableDirs.contains(currentDirectories[1]));
327+
assertEquals("Only 1 writable directory should remain", 1, writableDirs.size());
328+
329+
// 9. Verify GC status
330+
((DbLedgerStorage) bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> {
331+
if (Objects.equals(storage.getLedgerBaseDir(), currentDirectories[0].getPath())) {
332+
assertTrue("dir1 should suspend minor GC", storage.isMinorGcSuspended());
333+
assertTrue("dir1 should suspend major GC", storage.isMajorGcSuspended());
334+
} else {
335+
assertFalse("dir2 should not suspend minor GC", storage.isMinorGcSuspended());
336+
assertFalse("dir2 should not suspend major GC", storage.isMajorGcSuspended());
337+
}
338+
});
339+
340+
// 10. Restore dir1 status
341+
diskChecker.setUsageMap(currentDirectories[0], 0.5f); // 50% usage
342+
assertTrue("dir1 did not become writable again", dir1Writable.await(3, TimeUnit.SECONDS));
343+
344+
// 11. Verify GC status after recovery
345+
((DbLedgerStorage) bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> {
346+
if (Objects.equals(storage.getLedgerBaseDir(), currentDirectories[0].getPath())) {
347+
assertFalse("dir1 should not suspend minor GC", storage.isMinorGcSuspended());
348+
assertFalse("dir1 should not suspend major GC", storage.isMajorGcSuspended());
349+
} else {
350+
assertFalse("dir2 should not suspend minor GC", storage.isMinorGcSuspended());
351+
assertFalse("dir2 should not suspend major GC", storage.isMajorGcSuspended());
352+
}
353+
});
354+
355+
// 12. Cleanup
356+
bookie.shutdown();
357+
}
358+
359+
// Custom disk checker (simulate different usage for directories)
360+
static class GCTestDiskChecker extends DiskChecker {
361+
private final Map<File, Float> usageMap = new ConcurrentHashMap<>();
362+
363+
public GCTestDiskChecker(float threshold, float warnThreshold) {
364+
super(threshold, warnThreshold);
365+
}
366+
367+
// Set simulated usage for a directory
368+
public void setUsageMap(File dir, float usage) {
369+
usageMap.put(dir, usage);
370+
}
371+
372+
@Override
373+
public float checkDir(File dir) throws DiskErrorException, DiskWarnThresholdException, DiskOutOfSpaceException {
374+
Float usage = usageMap.get(dir);
375+
if (usage == null) {
376+
return super.checkDir(dir); // Default behavior
377+
}
378+
// Throw exception based on preset usage rate
379+
if (usage >= 1.0) {
380+
throw new DiskOutOfSpaceException("Simulated disk full", usage);
381+
} else if (usage >= 0.9) {
382+
throw new DiskWarnThresholdException("Simulated disk warning", usage);
383+
}
384+
return usage;
385+
}
386+
}
387+
253388
}

0 commit comments

Comments
 (0)