@@ -1767,10 +1767,11 @@ public void testJoinNodes() throws Exception {
17671767 assertTrue (cluster .start (peer1 , false , 300 ));
17681768
17691769 // add peer1
1770- CountDownLatch latch = new CountDownLatch (1 );
17711770 peers .add (peer1 );
1772- leader .addPeer (peer1 .getPeerId (), 5L , new ExpectClosure (latch ));
1773- waitLatch (latch );
1771+ SynchronizedClosure addPeer1Done = new SynchronizedClosure ();
1772+ leader .addPeer (peer1 .getPeerId (), 5L , addPeer1Done );
1773+ Status addPeer1Status = assertTimeoutPreemptively (Duration .ofSeconds (30 ), addPeer1Done ::await );
1774+ assertTrue (addPeer1Status .isOk (), "addPeer(peer1) failed: " + addPeer1Status );
17741775
17751776 cluster .ensureSame ();
17761777 assertEquals (2 , cluster .getFsms ().size ());
@@ -1779,25 +1780,34 @@ public void testJoinNodes() throws Exception {
17791780
17801781 // add peer2 but not start
17811782 peers .add (peer2 );
1782- latch = new CountDownLatch (1 );
1783- leader .addPeer (peer2 .getPeerId (), 6L , new ExpectClosure (RaftError .ECATCHUP , latch ));
1784- waitLatch (latch );
1783+ SynchronizedClosure addPeer2BeforeStartDone = new SynchronizedClosure ();
1784+ leader .addPeer (peer2 .getPeerId (), 6L , addPeer2BeforeStartDone );
1785+ Status addPeer2BeforeStartStatus = assertTimeoutPreemptively (Duration .ofSeconds (30 ), addPeer2BeforeStartDone ::await );
1786+ assertEquals (RaftError .ECATCHUP , addPeer2BeforeStartStatus .getRaftError (),
1787+ "addPeer(peer2) before startup should fail with ECATCHUP: " + addPeer2BeforeStartStatus );
17851788
17861789 // start peer2 after 2 seconds
17871790 Thread .sleep (2000 );
17881791 assertTrue (cluster .start (peer2 , false , 300 ));
17891792
17901793 // re-add peer2
1791- latch = new CountDownLatch ( 2 );
1792- leader .addPeer (peer2 .getPeerId (), 7L , new ExpectClosure ( latch ) );
1794+ SynchronizedClosure addPeer2Done = new SynchronizedClosure ( );
1795+ leader .addPeer (peer2 .getPeerId (), 7L , addPeer2Done );
17931796 // concurrent configuration change
1794- leader .addPeer (peer3 .getPeerId (), 8L , new ExpectClosure (RaftError .EBUSY , latch ));
1795- waitLatch (latch );
1797+ SynchronizedClosure addPeer3Done = new SynchronizedClosure ();
1798+ leader .addPeer (peer3 .getPeerId (), 8L , addPeer3Done );
1799+
1800+ Status addPeer2Status = assertTimeoutPreemptively (Duration .ofSeconds (30 ), addPeer2Done ::await );
1801+ assertTrue (addPeer2Status .isOk (), "re-add peer2 failed: " + addPeer2Status );
1802+
1803+ Status addPeer3Status = assertTimeoutPreemptively (Duration .ofSeconds (30 ), addPeer3Done ::await );
1804+ assertEquals (RaftError .EBUSY , addPeer3Status .getRaftError (),
1805+ "concurrent addPeer(peer3) should fail with EBUSY: " + addPeer3Status );
17961806
17971807 // re-add peer2 directly
17981808
17991809 try {
1800- leader .addPeer (peer2 .getPeerId (), 9L , new ExpectClosure ( latch ));
1810+ leader .addPeer (peer2 .getPeerId (), 9L , new SynchronizedClosure ( ));
18011811 fail ();
18021812 }
18031813 catch (IllegalArgumentException e ) {
@@ -2968,7 +2978,8 @@ public void testNodeWillNeverGetOutOfSnapshot() throws Exception {
29682978 // Start node C, it should install snapshot from leader.
29692979 log .info ("Start node [id={}]." , peers .get (2 ).getPeerId ().getConsistentId ());
29702980 assertTrue (cluster .start (peers .get (2 )));
2971- assertTrue (waitForCondition (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot (), 10_000 ));
2981+ await ().atMost (10 , TimeUnit .SECONDS )
2982+ .until (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
29722983
29732984 log .info ("Waiting for snapshot to start executing." );
29742985 assertThat (snapshotStartedFuture , willCompleteSuccessfully ());
@@ -2978,7 +2989,8 @@ public void testNodeWillNeverGetOutOfSnapshot() throws Exception {
29782989 cluster .stop (leader .getLeaderId ());
29792990 log .info ("Leader stopped." );
29802991
2981- assertTrue (cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
2992+ await ().atMost (10 , TimeUnit .SECONDS )
2993+ .until (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
29822994 }
29832995
29842996 /**
@@ -3043,7 +3055,8 @@ public void testNodeBlockedTimeoutNow() throws Exception {
30433055
30443056 return false ;
30453057 });
3046- assertTrue (waitForCondition (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot (), 10_000 ));
3058+ await ().atMost (10 , TimeUnit .SECONDS )
3059+ .until (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
30473060 // While snapshot is being installed, stop the leader.
30483061
30493062 log .info ("Waiting for snapshot to start executing." );
@@ -3053,9 +3066,8 @@ public void testNodeBlockedTimeoutNow() throws Exception {
30533066 cluster .stop (leader .getLeaderId ());
30543067 log .info ("Leader stopped." );
30553068
3056- Thread .sleep (30_000 );
3057-
3058- assertTrue (cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
3069+ await ().atMost (30 , TimeUnit .SECONDS )
3070+ .until (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
30593071 }
30603072
30613073 @ Test
@@ -3110,13 +3122,15 @@ public void testReelectionWithTimeoutNow() throws Exception {
31103122 log .info ("Start node [id={}]." , peers .get (2 ).getPeerId ().getConsistentId ());
31113123 assertTrue (cluster .start (peers .get (2 )));
31123124
3113- assertTrue (waitForCondition (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot (), 10_000 ));
3125+ await ().atMost (10 , TimeUnit .SECONDS )
3126+ .until (() -> cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
31143127 // While snapshot is being installed, stop the leader.
31153128 log .info ("Stopping leader [id={}]." , leader .getLeaderId ());
31163129 cluster .stop (leader .getLeaderId ());
31173130 log .info ("Leader stopped." );
31183131
3119- assertFalse (cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
3132+ await ().timeout (10 , TimeUnit .SECONDS )
3133+ .until (() -> !cluster .getNode (peers .get (2 ).getPeerId ()).isInstallingSnapshot ());
31203134 }
31213135
31223136 private void tapIntoSnapshotCopier (
@@ -4127,6 +4141,39 @@ private Future<?> startChangePeersAndLearnersThread(ChangeArg arg) {
41274141 });
41284142 }
41294143
4144+ private void changePeersAndLearnersWithRetry (Configuration conf , long timeoutMillis ) throws InterruptedException {
4145+ Status lastStatus = null ;
4146+ long deadlineNanos = System .nanoTime () + TimeUnit .MILLISECONDS .toNanos (timeoutMillis );
4147+
4148+ while (System .nanoTime () < deadlineNanos ) {
4149+ Node leader = cluster .waitAndGetLeader ();
4150+
4151+ if (leader == null ) {
4152+ continue ;
4153+ }
4154+
4155+ SynchronizedClosure done = new SynchronizedClosure ();
4156+
4157+ leader .changePeersAndLearners (conf , leader .getCurrentTerm (), done );
4158+
4159+ lastStatus = done .await ();
4160+
4161+ if (lastStatus .isOk ()) {
4162+ return ;
4163+ }
4164+
4165+ RaftError error = lastStatus .getRaftError ();
4166+
4167+ if (error != RaftError .EBUSY && error != RaftError .EPERM && error != RaftError .ECATCHUP ) {
4168+ break ;
4169+ }
4170+
4171+ Thread .sleep (100 );
4172+ }
4173+
4174+ assertTrue (lastStatus != null && lastStatus .isOk (), String .valueOf (lastStatus ));
4175+ }
4176+
41304177 @ Test
41314178 public void testChangePeersAndLearnersChaosWithSnapshot () throws Exception {
41324179 // start cluster
@@ -4161,11 +4208,7 @@ public void testChangePeersAndLearnersChaosWithSnapshot() throws Exception {
41614208 }
41624209 arg .stop = true ;
41634210 future .get ();
4164- SynchronizedClosure done = new SynchronizedClosure ();
4165- Node leader = cluster .waitAndGetLeader ();
4166- leader .changePeersAndLearners (new Configuration (peers .stream ().map (TestPeer ::getPeerId ).collect (toList ())), leader .getCurrentTerm (), done );
4167- Status st = done .await ();
4168- assertTrue (st .isOk (), st .getErrorMsg ());
4211+ changePeersAndLearnersWithRetry (new Configuration (peers .stream ().map (TestPeer ::getPeerId ).collect (toList ())), 10_000 );
41694212 cluster .ensureSame ();
41704213 assertEquals (10 , cluster .getFsms ().size ());
41714214 for (MockStateMachine fsm : cluster .getFsms ())
@@ -4207,10 +4250,7 @@ public void testChangePeersAndLearnersChaosWithoutSnapshot() throws Exception {
42074250 }
42084251 arg .stop = true ;
42094252 future .get ();
4210- SynchronizedClosure done = new SynchronizedClosure ();
4211- Node leader = cluster .waitAndGetLeader ();
4212- leader .changePeersAndLearners (new Configuration (peers .stream ().map (TestPeer ::getPeerId ).collect (toList ())), leader .getCurrentTerm (), done );
4213- assertTrue (done .await ().isOk ());
4253+ changePeersAndLearnersWithRetry (new Configuration (peers .stream ().map (TestPeer ::getPeerId ).collect (toList ())), 10_000 );
42144254 cluster .ensureSame ();
42154255 assertEquals (10 , cluster .getFsms ().size ());
42164256 for (MockStateMachine fsm : cluster .getFsms ()) {
@@ -4280,10 +4320,7 @@ public void testChangePeersAndLearnersChaosApplyTasks() throws Exception {
42804320 for (Future <?> future : futures )
42814321 future .get ();
42824322
4283- SynchronizedClosure done = new SynchronizedClosure ();
4284- Node leader = cluster .waitAndGetLeader ();
4285- leader .changePeersAndLearners (new Configuration (peers .stream ().map (TestPeer ::getPeerId ).collect (toList ())), leader .getCurrentTerm (), done );
4286- assertTrue (done .await ().isOk ());
4323+ changePeersAndLearnersWithRetry (new Configuration (peers .stream ().map (TestPeer ::getPeerId ).collect (toList ())), 10_000 );
42874324 cluster .ensureSame ();
42884325 assertEquals (10 , cluster .getFsms ().size ());
42894326
0 commit comments