Skip to content

Commit f0388a1

Browse files
authored
HDDS-10306. Speed up TestSnapshotBackgroundServices (#9721)
1 parent c3b3980 commit f0388a1

2 files changed

Lines changed: 133 additions & 62 deletions

File tree

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java

Lines changed: 98 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.io.IOException;
3737
import java.util.ArrayList;
3838
import java.util.Collections;
39+
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.Objects;
4142
import java.util.concurrent.TimeUnit;
@@ -62,7 +63,6 @@
6263
import org.apache.hadoop.ozone.conf.OMClientConfig;
6364
import org.apache.hadoop.ozone.om.OMConfigKeys;
6465
import org.apache.hadoop.ozone.om.OmSnapshot;
65-
import org.apache.hadoop.ozone.om.OmTestUtil;
6666
import org.apache.hadoop.ozone.om.OzoneManager;
6767
import org.apache.hadoop.ozone.om.SstFilteringService;
6868
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -80,15 +80,21 @@
8080
import org.apache.ozone.test.tag.Flaky;
8181
import org.apache.ratis.server.protocol.TermIndex;
8282
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
83-
import org.junit.jupiter.api.AfterEach;
83+
import org.junit.jupiter.api.AfterAll;
84+
import org.junit.jupiter.api.BeforeAll;
8485
import org.junit.jupiter.api.BeforeEach;
8586
import org.junit.jupiter.api.DisplayName;
87+
import org.junit.jupiter.api.MethodOrderer;
88+
import org.junit.jupiter.api.Order;
8689
import org.junit.jupiter.api.Test;
87-
import org.junit.jupiter.api.TestInfo;
90+
import org.junit.jupiter.api.TestInstance;
91+
import org.junit.jupiter.api.TestMethodOrder;
8892

8993
/**
9094
* Tests snapshot background services.
9195
*/
96+
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
97+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
9298
public class TestSnapshotBackgroundServices {
9399
private MiniOzoneHAClusterImpl cluster;
94100
private ObjectStore objectStore;
@@ -105,8 +111,8 @@ public class TestSnapshotBackgroundServices {
105111
private OzoneClient client;
106112
private final AtomicInteger counter = new AtomicInteger();
107113

108-
@BeforeEach
109-
public void init(TestInfo testInfo) throws Exception {
114+
@BeforeAll
115+
public void init() throws Exception {
110116
OzoneConfiguration conf = new OzoneConfiguration();
111117
String omServiceId = "om-service-test1";
112118
OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class);
@@ -120,66 +126,48 @@ public void init(TestInfo testInfo) throws Exception {
120126
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
121127
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB);
122128
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
123-
if ("testSSTFilteringBackgroundService".equals(testInfo.getDisplayName())) {
124-
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1,
125-
TimeUnit.SECONDS);
126-
}
127-
if ("testCompactionLogBackgroundService"
128-
.equals(testInfo.getDisplayName())) {
129-
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1,
130-
TimeUnit.MILLISECONDS);
131-
}
132-
if ("testBackupCompactionFilesPruningBackgroundService"
133-
.equals(testInfo.getDisplayName())) {
134-
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1,
135-
TimeUnit.MILLISECONDS);
136-
conf.setTimeDuration(
137-
OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 1,
138-
TimeUnit.SECONDS);
139-
}
140-
if ("testSnapshotAndKeyDeletionBackgroundServices"
141-
.equals(testInfo.getDisplayName())) {
142-
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1,
143-
TimeUnit.SECONDS);
144-
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1,
145-
TimeUnit.SECONDS);
146-
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1,
147-
TimeUnit.MILLISECONDS);
148-
conf.setTimeDuration(
149-
OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3,
150-
TimeUnit.SECONDS);
151-
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 3,
152-
TimeUnit.SECONDS);
153-
}
129+
130+
// Used by: testSSTFilteringBackgroundService
131+
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
132+
133+
// Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService,
134+
// testSnapshotAndKeyDeletionBackgroundServices
135+
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, TimeUnit.MILLISECONDS);
136+
137+
// Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService
138+
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3, TimeUnit.SECONDS);
139+
140+
// Used by: testSnapshotAndKeyDeletionBackgroundServices
141+
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
142+
// Used by: testSnapshotAndKeyDeletionBackgroundServices
143+
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
144+
154145
conf.setLong(
155146
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
156147
SNAPSHOT_THRESHOLD);
157148
int numOfOMs = 3;
158149
cluster = MiniOzoneCluster.newHABuilder(conf)
159-
.setOMServiceId("om-service-test1")
150+
.setOMServiceId(omServiceId)
160151
.setNumOfOzoneManagers(numOfOMs)
161-
.setNumOfActiveOMs(2)
152+
.setNumOfActiveOMs(numOfOMs)
162153
.build();
163-
if ("testBackupCompactionFilesPruningBackgroundService"
164-
.equals(testInfo.getDisplayName())) {
165-
cluster.
166-
getOzoneManagersList()
167-
.forEach(
168-
TestSnapshotBackgroundServices
169-
::suspendBackupCompactionFilesPruning);
170-
}
154+
171155
cluster.waitForClusterToBeReady();
172156
client = OzoneClientFactory.getRpcClient(omServiceId, conf);
173157
objectStore = client.getObjectStore();
158+
}
159+
160+
@BeforeEach
161+
public void setupTest() throws IOException, InterruptedException, TimeoutException {
162+
recoverCluster();
163+
stopFollowerOM(cluster.getOMLeader());
174164

175165
volumeName = "volume" + counter.incrementAndGet();
176166
bucketName = "bucket" + counter.incrementAndGet();
177-
178167
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
179168
.setOwner("user" + counter.incrementAndGet())
180169
.setAdmin("admin" + counter.incrementAndGet())
181170
.build();
182-
183171
objectStore.createVolume(volumeName, createVolumeArgs);
184172
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
185173

@@ -188,7 +176,32 @@ public void init(TestInfo testInfo) throws Exception {
188176
ozoneBucket = retVolumeinfo.getBucket(bucketName);
189177
}
190178

191-
@AfterEach
179+
private void recoverCluster() throws InterruptedException, TimeoutException, IOException {
180+
for (OzoneManager ozoneManager : cluster.getOzoneManagersList()) {
181+
if (!ozoneManager.isRunning()) {
182+
cluster.restartOzoneManager(ozoneManager, false);
183+
}
184+
185+
if (!ozoneManager.getMetadataManager().getStore().getRocksDBCheckpointDiffer().shouldRun()) {
186+
resumeBackupCompactionFilesPruning(ozoneManager);
187+
}
188+
}
189+
cluster.waitForClusterToBeReady();
190+
cluster.waitForLeaderOM();
191+
}
192+
193+
private void stopFollowerOM(OzoneManager leaderOM) throws TimeoutException, InterruptedException {
194+
for (OzoneManager om : cluster.getOzoneManagersList()) {
195+
if (om != leaderOM && om.isRunning()) {
196+
String omNodeId = om.getOMNodeId();
197+
cluster.stopOzoneManager(omNodeId);
198+
GenericTestUtils.waitFor(() -> !om.isRunning() && !om.isOmRpcServerRunning(), 100, 15000);
199+
break;
200+
}
201+
}
202+
}
203+
204+
@AfterAll
192205
public void shutdown() {
193206
IOUtils.closeQuietly(client);
194207
if (cluster != null) {
@@ -202,7 +215,7 @@ public void shutdown() {
202215
public void testSnapshotAndKeyDeletionBackgroundServices()
203216
throws Exception {
204217
OzoneManager leaderOM = getLeaderOM();
205-
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
218+
OzoneManager followerOM = getInactiveFollowerOM();
206219

207220
createSnapshotsEachWithNewKeys(leaderOM);
208221

@@ -332,7 +345,7 @@ private void startInactiveFollower(OzoneManager leaderOM,
332345
long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
333346

334347
// Start the inactive OM. Checkpoint installation will happen spontaneously.
335-
cluster.startInactiveOM(followerOM.getOMNodeId());
348+
cluster.restartOzoneManager(followerOM, true);
336349
actionAfterStarting.run();
337350

338351
// The recently started OM should be lagging behind the leader OM.
@@ -357,26 +370,41 @@ private void createSnapshotsEachWithNewKeys(OzoneManager ozoneManager)
357370
}
358371
}
359372

360-
private OzoneManager getInactiveFollowerOM(OzoneManager leaderOM) {
361-
String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
362-
if (cluster.isOMActive(followerNodeId)) {
363-
followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
364-
}
365-
return cluster.getOzoneManager(followerNodeId);
373+
private OzoneManager getInactiveFollowerOM()
374+
throws TimeoutException, InterruptedException {
375+
// Wait for an inactive OM to be available
376+
AtomicReference<OzoneManager> inactiveOM = new AtomicReference<>();
377+
GenericTestUtils.waitFor(() -> {
378+
Iterator<OzoneManager> iterator = cluster.getInactiveOM();
379+
if (iterator.hasNext()) {
380+
inactiveOM.set(iterator.next());
381+
return true;
382+
}
383+
return false;
384+
}, 100, 10000);
385+
OzoneManager result = inactiveOM.get();
386+
assertNotNull(result, "No inactive OM available");
387+
return result;
366388
}
367389

368390
private OzoneManager getLeaderOM() {
369-
final String leaderOMNodeId = OmTestUtil.getCurrentOmProxyNodeId(objectStore);
370-
return cluster.getOzoneManager(leaderOMNodeId);
391+
return cluster.getOMLeader();
371392
}
372393

373394
@Test
374395
@DisplayName("testCompactionLogBackgroundService")
375396
@Flaky("HDDS-11672")
397+
@Order(Integer.MAX_VALUE)
376398
public void testCompactionLogBackgroundService()
377399
throws IOException, InterruptedException, TimeoutException {
400+
401+
// reset to the default value to avoid side effects
402+
cluster.restartOzoneManagersWithConfigCustomizer(config -> {
403+
config.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 10, TimeUnit.MINUTES);
404+
});
405+
378406
OzoneManager leaderOM = getLeaderOM();
379-
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
407+
OzoneManager followerOM = getInactiveFollowerOM();
380408

381409
createSnapshotsEachWithNewKeys(leaderOM);
382410

@@ -407,6 +435,7 @@ public void testCompactionLogBackgroundService()
407435
newLeaderOM.getMetadataManager().getStore()
408436
.getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes()
409437
.stream().map(CompactionNode::getFileName).collect(toSet()));
438+
410439
assertEquals(leaderOM.getMetadataManager().getStore()
411440
.getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges()
412441
.stream().map(edge ->
@@ -440,8 +469,14 @@ private List<CompactionLogEntry> getCompactionLogEntries(OzoneManager om)
440469
@DisplayName("testBackupCompactionFilesPruningBackgroundService")
441470
public void testBackupCompactionFilesPruningBackgroundService()
442471
throws IOException, InterruptedException, TimeoutException {
472+
473+
cluster.getOzoneManagersList().stream()
474+
.filter(OzoneManager::isRunning)
475+
.forEach(TestSnapshotBackgroundServices::suspendBackupCompactionFilesPruning);
476+
443477
OzoneManager leaderOM = getLeaderOM();
444-
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
478+
OzoneManager followerOM = getInactiveFollowerOM();
479+
445480

446481
startInactiveFollower(leaderOM, followerOM,
447482
() -> suspendBackupCompactionFilesPruning(followerOM));
@@ -463,6 +498,7 @@ public void testBackupCompactionFilesPruningBackgroundService()
463498
assertNotNull(files);
464499
int numberOfSstFiles = files.length;
465500

501+
assertEquals(cluster.getOMLeader(), newLeaderOM);
466502
resumeBackupCompactionFilesPruning(newLeaderOM);
467503

468504
checkIfCompactionBackupFilesWerePruned(sstBackupDir,
@@ -494,7 +530,7 @@ private static void suspendBackupCompactionFilesPruning(
494530
public void testSSTFilteringBackgroundService()
495531
throws IOException, InterruptedException, TimeoutException {
496532
OzoneManager leaderOM = getLeaderOM();
497-
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
533+
OzoneManager followerOM = getInactiveFollowerOM();
498534

499535
createSnapshotsEachWithNewKeys(leaderOM);
500536

hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.TimeoutException;
36+
import java.util.function.Consumer;
3637
import java.util.function.Function;
3738
import org.apache.commons.lang3.StringUtils;
3839
import org.apache.hadoop.hdds.ExitManager;
@@ -125,6 +126,10 @@ public Iterator<StorageContainerManager> getInactiveSCM() {
125126
return scmhaService.inactiveServices();
126127
}
127128

129+
public Iterator<OzoneManager> getInactiveOM() {
130+
return omhaService.inactiveServices();
131+
}
132+
128133
public StorageContainerManager getSCM(String scmNodeId) {
129134
return this.scmhaService.getServiceById(scmNodeId);
130135
}
@@ -141,6 +146,30 @@ public List<OzoneManager> getOzoneManagersList() {
141146
return omhaService.getServices();
142147
}
143148

149+
public void restartOzoneManagersWithConfigCustomizer(Consumer<OzoneConfiguration> configCustomizer)
150+
throws IOException, TimeoutException, InterruptedException {
151+
List<OzoneManager> toRestart = new ArrayList<>();
152+
for (OzoneManager om : getOzoneManagersList()) {
153+
OzoneConfiguration configuration = new OzoneConfiguration(om.getConfiguration());
154+
if (configCustomizer != null) {
155+
configCustomizer.accept(configuration);
156+
}
157+
om.setConfiguration(configuration);
158+
if (om.isRunning()) {
159+
toRestart.add(om);
160+
}
161+
}
162+
for (OzoneManager om : toRestart) {
163+
if (!om.stop()) {
164+
continue;
165+
}
166+
om.join();
167+
om.restart();
168+
GenericTestUtils.waitFor(om::isRunning, 1000, 30000);
169+
}
170+
waitForLeaderOM();
171+
}
172+
144173
public List<StorageContainerManager> getStorageContainerManagersList() {
145174
return scmhaService.getServices();
146175
}
@@ -244,6 +273,12 @@ public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM)
244273
GenericTestUtils.waitFor(ozoneManager::isRunning,
245274
1000, waitForClusterToBeReadyTimeout);
246275
}
276+
277+
omhaService.inactiveServices().forEachRemaining(om -> {
278+
if (om.equals(ozoneManager)) {
279+
this.omhaService.activate(om);
280+
}
281+
});
247282
}
248283

249284
public void shutdownStorageContainerManager(StorageContainerManager scm) {

0 commit comments

Comments
 (0)