Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
private final SnapshotChainManager snapshotChainManager;
private final boolean deepCleanSnapshots;
private ExecutorService deletionThreadPool;
private final int numberOfParallelThreadsPerStore;
private final AtomicInteger numberOfParallelThreadsPerStore;
private final AtomicLong deletedDirsCount;
private final AtomicLong movedDirsCount;
private final AtomicLong movedFilesCount;
Expand All @@ -174,9 +174,8 @@ public DirectoryDeletingService(long interval, TimeUnit unit,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize;
this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore,
interval, unit, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
this.numberOfParallelThreadsPerStore = new AtomicInteger(dirDeletingServiceCorePoolSize);
this.deletionThreadPool = createDeletionThreadPool(numberOfParallelThreadsPerStore.get());
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * 0.9);
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler());
Expand Down Expand Up @@ -209,6 +208,7 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) {
shutdown();
setInterval(newInterval, TimeUnit.SECONDS);
setPoolSize(newCorePoolSize);
this.numberOfParallelThreadsPerStore.set(newCorePoolSize);
start();
}

Expand Down Expand Up @@ -251,16 +251,31 @@ public void shutdown() {
@Override
public synchronized void start() {
if (deletionThreadPool == null || deletionThreadPool.isShutdown() || deletionThreadPool.isTerminated()) {
this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore,
super.getIntervalMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
this.deletionThreadPool = createDeletionThreadPool(numberOfParallelThreadsPerStore.get());
}
super.start();
}

private ThreadPoolExecutor createDeletionThreadPool(int threadCount) {
long intervalMillis = super.getIntervalMillis();
ThreadPoolExecutor pool = new ThreadPoolExecutor(
threadCount, threadCount, intervalMillis,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
if (intervalMillis > 0) {
pool.allowCoreThreadTimeOut(true);
}
return pool;
}

private boolean isThreadPoolActive(ExecutorService threadPoolExecutor) {
return threadPoolExecutor != null && !threadPoolExecutor.isShutdown() && !threadPoolExecutor.isTerminated();
}

@VisibleForTesting
ThreadPoolExecutor getDeletionThreadPool() {
return (ThreadPoolExecutor) deletionThreadPool;
}

@SuppressWarnings("checkstyle:ParameterNumber")
void optimizeDirDeletesAndSubmitRequest(
long dirNum, long subDirNum, long subFileNum,
Expand Down Expand Up @@ -602,7 +617,8 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key
Map<UUID, Pair<Long, Long>> exclusiveSizeMap = Maps.newConcurrentMap();

CompletableFuture<Boolean> processedAllDeletedDirs = CompletableFuture.completedFuture(true);
for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
final int parallelThreads = numberOfParallelThreadsPerStore.get();
for (int i = 0; i < parallelThreads; i++) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,23 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mockStatic;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -66,18 +65,13 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test Directory Deleting Service.
*/
public class TestDirectoryDeletingService {

private static final Logger LOG = LoggerFactory.getLogger(TestDirectoryDeletingService.class);

@TempDir
private Path folder;
private OzoneManager om;
Expand Down Expand Up @@ -189,52 +183,48 @@ public void testDeleteDirectoryCrossingSizeLimit() throws Exception {
public void testMultithreadedDirectoryDeletion() throws Exception {
int threadCount = 10;
OzoneConfiguration conf = createConfAndInitValues(threadCount);
OmTestManagers omTestManagers
= new OmTestManagers(conf);
OmTestManagers omTestManagers = new OmTestManagers(conf);
OzoneManager ozoneManager = omTestManagers.getOzoneManager();
AtomicBoolean isRunning = new AtomicBoolean(true);
try (MockedStatic mockedStatic = mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>();
Thread deletionThread = new Thread(() -> {
while (futureList.size() < threadCount) {
try {
DirectoryDeletingService service =
(DirectoryDeletingService) ozoneManager.getKeyManager().getDirDeletingService();
ThreadPoolExecutor threadPoolExecutor = service.getDeletionThreadPool();
assertThat(threadPoolExecutor.getCorePoolSize()).as(
"core pool size should match configured directory deletion threads").isEqualTo(threadCount);

CountDownLatch tasksStarted = new CountDownLatch(threadCount);
CountDownLatch releaseTasks = new CountDownLatch(1);
Set<String> workerThreads = Collections.synchronizedSet(new HashSet<>());
List<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < threadCount; i++) {
futures.add(CompletableFuture.runAsync(() -> {
workerThreads.add(Thread.currentThread().getName());
tasksStarted.countDown();
try {
Thread.sleep(100);
assertTrue(releaseTasks.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
LOG.error("Error while sleeping", e);
Thread.currentThread().interrupt();
throw new AssertionError("Interrupted while waiting for release latch", e);
}
}
for (int i = futureList.size() - 1; i >= 0; i--) {
Pair<Supplier, CompletableFuture> pair = futureList.get(i);
pair.getLeft().get();
assertTrue(isRunning.get());
pair.getRight().complete(false);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.error("Error while sleeping", e);
}
}
});
deletionThread.start();

mockedStatic
.when(() -> CompletableFuture.supplyAsync(any(), any()))
.thenAnswer(invocation -> {
Supplier<Boolean> supplier = invocation.getArgument(0);
CompletableFuture<Boolean> future = new CompletableFuture<>();
futureList.add(Pair.of(supplier, future));
return future;
});
ozoneManager.getKeyManager().getDirDeletingService().suspend();
DirectoryDeletingService.DirDeletingTask dirDeletingTask =
ozoneManager.getKeyManager().getDirDeletingService().new DirDeletingTask(null);

dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 6000);
assertThat(futureList).hasSize(threadCount);
for (Pair<Supplier, CompletableFuture> pair : futureList) {
assertTrue(pair.getRight().isDone());
}, threadPoolExecutor));
}
isRunning.set(false);

assertTrue(tasksStarted.await(10, TimeUnit.SECONDS), "Expected all submitted tasks to start");
assertThat(workerThreads).as("Expected tasks to run in parallel using configured workers").hasSize(threadCount);

releaseTasks.countDown();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);

// Also exercise the actual DirectoryDeletingService task path.
service.suspend();
DirectoryDeletingService.DirDeletingTask dirDeletingTask = service.new DirDeletingTask(null);
assertThat(threadPoolExecutor.isShutdown()).as(
"deletion thread pool should be active when processing deleted dirs").isFalse();
long completedTaskCountBefore = threadPoolExecutor.getCompletedTaskCount();
dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 1);
GenericTestUtils.waitFor(
() -> threadPoolExecutor.getCompletedTaskCount() >= completedTaskCountBefore + threadCount, 100, 5000);
} finally {
ozoneManager.stop();
}
Expand Down