Skip to content

Commit be850c1

Browse files
HDDS-15080. DirectoryDeletingService is using single thread (#10125)
1 parent 0c19a9b commit be850c1

2 files changed

Lines changed: 65 additions & 59 deletions

File tree

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
159159
private final SnapshotChainManager snapshotChainManager;
160160
private final boolean deepCleanSnapshots;
161161
private ExecutorService deletionThreadPool;
162-
private final int numberOfParallelThreadsPerStore;
162+
private final AtomicInteger numberOfParallelThreadsPerStore;
163163
private final AtomicLong deletedDirsCount;
164164
private final AtomicLong movedDirsCount;
165165
private final AtomicLong movedFilesCount;
@@ -174,9 +174,8 @@ public DirectoryDeletingService(long interval, TimeUnit unit,
174174
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
175175
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
176176
StorageUnit.BYTES);
177-
this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize;
178-
this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore,
179-
interval, unit, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
177+
this.numberOfParallelThreadsPerStore = new AtomicInteger(dirDeletingServiceCorePoolSize);
178+
this.deletionThreadPool = createDeletionThreadPool(numberOfParallelThreadsPerStore.get());
180179
// always go to 90% of max limit for request as other header will be added
181180
this.ratisByteLimit = (int) (limit * 0.9);
182181
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler());
@@ -209,6 +208,7 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) {
209208
shutdown();
210209
setInterval(newInterval, TimeUnit.SECONDS);
211210
setPoolSize(newCorePoolSize);
211+
this.numberOfParallelThreadsPerStore.set(newCorePoolSize);
212212
start();
213213
}
214214

@@ -251,16 +251,31 @@ public void shutdown() {
251251
@Override
252252
public synchronized void start() {
253253
if (deletionThreadPool == null || deletionThreadPool.isShutdown() || deletionThreadPool.isTerminated()) {
254-
this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore,
255-
super.getIntervalMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
254+
this.deletionThreadPool = createDeletionThreadPool(numberOfParallelThreadsPerStore.get());
256255
}
257256
super.start();
258257
}
259258

259+
private ThreadPoolExecutor createDeletionThreadPool(int threadCount) {
260+
long intervalMillis = super.getIntervalMillis();
261+
ThreadPoolExecutor pool = new ThreadPoolExecutor(
262+
threadCount, threadCount, intervalMillis,
263+
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
264+
if (intervalMillis > 0) {
265+
pool.allowCoreThreadTimeOut(true);
266+
}
267+
return pool;
268+
}
269+
260270
private boolean isThreadPoolActive(ExecutorService threadPoolExecutor) {
261271
return threadPoolExecutor != null && !threadPoolExecutor.isShutdown() && !threadPoolExecutor.isTerminated();
262272
}
263273

274+
@VisibleForTesting
275+
ThreadPoolExecutor getDeletionThreadPool() {
276+
return (ThreadPoolExecutor) deletionThreadPool;
277+
}
278+
264279
@SuppressWarnings("checkstyle:ParameterNumber")
265280
void optimizeDirDeletesAndSubmitRequest(
266281
long dirNum, long subDirNum, long subFileNum,
@@ -602,7 +617,8 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key
602617
Map<UUID, Pair<Long, Long>> exclusiveSizeMap = Maps.newConcurrentMap();
603618

604619
CompletableFuture<Boolean> processedAllDeletedDirs = CompletableFuture.completedFuture(true);
605-
for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
620+
final int parallelThreads = numberOfParallelThreadsPerStore.get();
621+
for (int i = 0; i < parallelThreads; i++) {
606622
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
607623
try {
608624
return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier,

hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java

Lines changed: 42 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,23 @@
2222
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
2323
import static org.assertj.core.api.Assertions.assertThat;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
25-
import static org.mockito.ArgumentMatchers.any;
26-
import static org.mockito.Mockito.CALLS_REAL_METHODS;
27-
import static org.mockito.Mockito.mockStatic;
2825

2926
import java.io.File;
3027
import java.io.IOException;
3128
import java.nio.file.Files;
3229
import java.nio.file.Path;
3330
import java.util.ArrayList;
31+
import java.util.Collections;
3432
import java.util.HashMap;
33+
import java.util.HashSet;
3534
import java.util.List;
3635
import java.util.Map;
36+
import java.util.Set;
3737
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.ThreadPoolExecutor;
3840
import java.util.concurrent.TimeUnit;
39-
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.atomic.AtomicInteger;
41-
import java.util.function.Supplier;
42-
import org.apache.commons.lang3.tuple.Pair;
4342
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
4443
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
4544
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -66,18 +65,13 @@
6665
import org.junit.jupiter.api.DisplayName;
6766
import org.junit.jupiter.api.Test;
6867
import org.junit.jupiter.api.io.TempDir;
69-
import org.mockito.MockedStatic;
7068
import org.mockito.Mockito;
71-
import org.slf4j.Logger;
72-
import org.slf4j.LoggerFactory;
7369

7470
/**
7571
* Test Directory Deleting Service.
7672
*/
7773
public class TestDirectoryDeletingService {
7874

79-
private static final Logger LOG = LoggerFactory.getLogger(TestDirectoryDeletingService.class);
80-
8175
@TempDir
8276
private Path folder;
8377
private OzoneManager om;
@@ -189,52 +183,48 @@ public void testDeleteDirectoryCrossingSizeLimit() throws Exception {
189183
public void testMultithreadedDirectoryDeletion() throws Exception {
190184
int threadCount = 10;
191185
OzoneConfiguration conf = createConfAndInitValues(threadCount);
192-
OmTestManagers omTestManagers
193-
= new OmTestManagers(conf);
186+
OmTestManagers omTestManagers = new OmTestManagers(conf);
194187
OzoneManager ozoneManager = omTestManagers.getOzoneManager();
195-
AtomicBoolean isRunning = new AtomicBoolean(true);
196-
try (MockedStatic mockedStatic = mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
197-
List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>();
198-
Thread deletionThread = new Thread(() -> {
199-
while (futureList.size() < threadCount) {
188+
try {
189+
DirectoryDeletingService service =
190+
(DirectoryDeletingService) ozoneManager.getKeyManager().getDirDeletingService();
191+
ThreadPoolExecutor threadPoolExecutor = service.getDeletionThreadPool();
192+
assertThat(threadPoolExecutor.getCorePoolSize()).as(
193+
"core pool size should match configured directory deletion threads").isEqualTo(threadCount);
194+
195+
CountDownLatch tasksStarted = new CountDownLatch(threadCount);
196+
CountDownLatch releaseTasks = new CountDownLatch(1);
197+
Set<String> workerThreads = Collections.synchronizedSet(new HashSet<>());
198+
List<CompletableFuture<Void>> futures = new ArrayList<>();
199+
200+
for (int i = 0; i < threadCount; i++) {
201+
futures.add(CompletableFuture.runAsync(() -> {
202+
workerThreads.add(Thread.currentThread().getName());
203+
tasksStarted.countDown();
200204
try {
201-
Thread.sleep(100);
205+
assertTrue(releaseTasks.await(10, TimeUnit.SECONDS));
202206
} catch (InterruptedException e) {
203-
LOG.error("Error while sleeping", e);
207+
Thread.currentThread().interrupt();
208+
throw new AssertionError("Interrupted while waiting for release latch", e);
204209
}
205-
}
206-
for (int i = futureList.size() - 1; i >= 0; i--) {
207-
Pair<Supplier, CompletableFuture> pair = futureList.get(i);
208-
pair.getLeft().get();
209-
assertTrue(isRunning.get());
210-
pair.getRight().complete(false);
211-
try {
212-
Thread.sleep(500);
213-
} catch (InterruptedException e) {
214-
LOG.error("Error while sleeping", e);
215-
}
216-
}
217-
});
218-
deletionThread.start();
219-
220-
mockedStatic
221-
.when(() -> CompletableFuture.supplyAsync(any(), any()))
222-
.thenAnswer(invocation -> {
223-
Supplier<Boolean> supplier = invocation.getArgument(0);
224-
CompletableFuture<Boolean> future = new CompletableFuture<>();
225-
futureList.add(Pair.of(supplier, future));
226-
return future;
227-
});
228-
ozoneManager.getKeyManager().getDirDeletingService().suspend();
229-
DirectoryDeletingService.DirDeletingTask dirDeletingTask =
230-
ozoneManager.getKeyManager().getDirDeletingService().new DirDeletingTask(null);
231-
232-
dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 6000);
233-
assertThat(futureList).hasSize(threadCount);
234-
for (Pair<Supplier, CompletableFuture> pair : futureList) {
235-
assertTrue(pair.getRight().isDone());
210+
}, threadPoolExecutor));
236211
}
237-
isRunning.set(false);
212+
213+
assertTrue(tasksStarted.await(10, TimeUnit.SECONDS), "Expected all submitted tasks to start");
214+
assertThat(workerThreads).as("Expected tasks to run in parallel using configured workers").hasSize(threadCount);
215+
216+
releaseTasks.countDown();
217+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
218+
219+
// Also exercise the actual DirectoryDeletingService task path.
220+
service.suspend();
221+
DirectoryDeletingService.DirDeletingTask dirDeletingTask = service.new DirDeletingTask(null);
222+
assertThat(threadPoolExecutor.isShutdown()).as(
223+
"deletion thread pool should be active when processing deleted dirs").isFalse();
224+
long completedTaskCountBefore = threadPoolExecutor.getCompletedTaskCount();
225+
dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 1);
226+
GenericTestUtils.waitFor(
227+
() -> threadPoolExecutor.getCompletedTaskCount() >= completedTaskCountBefore + threadCount, 100, 5000);
238228
} finally {
239229
ozoneManager.stop();
240230
}

0 commit comments

Comments
 (0)