Skip to content

Commit 371fbfe

Browse files
authored
RATIS-1305. Leader stuck in infinite install snapshot cycle when logs have been purged (#420). Contributed by Ethan Rose
1 parent aa6c5ff commit 371fbfe

2 files changed

Lines changed: 113 additions & 3 deletions

File tree

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,16 @@ private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
562562
* @return the first available log's start term index
563563
*/
564564
private TermIndex shouldNotifyToInstallSnapshot() {
565+
final long followerNextIndex = getFollower().getNextIndex();
566+
final long leaderNextIndex = getRaftLog().getNextIndex();
567+
568+
if (followerNextIndex >= leaderNextIndex) {
569+
return null;
570+
}
571+
565572
final long leaderStartIndex = getRaftLog().getStartIndex();
566-
if (getFollower().getNextIndex() < leaderStartIndex) {
573+
574+
if (followerNextIndex < leaderStartIndex) {
567575
// The Leader does not have the logs from the Follower's last log
568576
// index onwards. And install snapshot is disabled. So the Follower
569577
// should be notified to install the latest snapshot through its
@@ -572,8 +580,9 @@ private TermIndex shouldNotifyToInstallSnapshot() {
572580
} else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
573581
// Leader has no logs to check from, hence return next index.
574582
return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(),
575-
getRaftLog().getNextIndex());
583+
leaderNextIndex);
576584
}
585+
577586
return null;
578587
}
579588

ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.ratis.server.protocol.TermIndex;
3131
import org.apache.ratis.server.raftlog.RaftLog;
3232
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
33-
import org.apache.ratis.server.storage.RaftStorage;
3433
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
3534
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
3635
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -51,6 +50,7 @@
5150
import java.nio.file.Path;
5251
import java.util.List;
5352
import java.util.concurrent.CompletableFuture;
53+
import java.util.concurrent.atomic.AtomicInteger;
5454
import java.util.concurrent.atomic.AtomicReference;
5555

5656
public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftCluster>
@@ -79,11 +79,16 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
7979
private static final int PURGE_GAP = 8;
8080
private static final AtomicReference<SnapshotInfo> leaderSnapshotInfoRef = new AtomicReference<>();
8181

82+
private static final AtomicInteger numSnapshotRequests = new AtomicInteger();
83+
8284
private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing {
8385
@Override
8486
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
8587
RaftProtos.RoleInfoProto roleInfoProto,
8688
TermIndex termIndex) {
89+
90+
numSnapshotRequests.incrementAndGet();
91+
8792
final SingleFileSnapshotInfo leaderSnapshotInfo = (SingleFileSnapshotInfo) leaderSnapshotInfoRef.get();
8893
LOG.info("{}: leaderSnapshotInfo = {}", getId(), leaderSnapshotInfo);
8994
if (leaderSnapshotInfo == null) {
@@ -236,6 +241,102 @@ private void testRestartFollower(CLUSTER cluster) throws Exception {
236241
Assert.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
237242
Assert.assertEquals(newLeaderNextIndex, follower.getRaftLog().getNextIndex());
238243
}, 10, ONE_SECOND, "followerNextIndex", LOG);
244+
}
245+
246+
@Test
247+
public void testInstallSnapshotNotificationCount() throws Exception {
248+
runWithNewCluster(3, this::testInstallSnapshotNotificationCount);
249+
}
250+
251+
252+
private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Exception {
253+
leaderSnapshotInfoRef.set(null);
254+
numSnapshotRequests.set(0);
255+
256+
int i = 0;
257+
try {
258+
RaftTestUtil.waitForLeader(cluster);
259+
final RaftPeerId leaderId = cluster.getLeader().getId();
239260

261+
// Let a few heartbeats pass.
262+
ONE_SECOND.sleep();
263+
Assert.assertEquals(0, numSnapshotRequests.get());
264+
265+
// Generate data.
266+
try(final RaftClient client = cluster.createClient(leaderId)) {
267+
for (; i < 10; i++) {
268+
RaftClientReply
269+
reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
270+
Assert.assertTrue(reply.isSuccess());
271+
}
272+
}
273+
274+
// Take snapshot and check result.
275+
long snapshotIndex = cluster.getLeader().getStateMachine().takeSnapshot();
276+
Assert.assertEquals(20, snapshotIndex);
277+
final SnapshotInfo leaderSnapshotInfo = cluster.getLeader().getStateMachine().getLatestSnapshot();
278+
Assert.assertEquals(20, leaderSnapshotInfo.getIndex());
279+
final boolean set = leaderSnapshotInfoRef.compareAndSet(null, leaderSnapshotInfo);
280+
Assert.assertTrue(set);
281+
282+
// Wait for the snapshot to be done.
283+
final RaftServer.Division leader = cluster.getLeader();
284+
final long nextIndex = leader.getRaftLog().getNextIndex();
285+
Assert.assertEquals(21, nextIndex);
286+
// End index is exclusive.
287+
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
288+
0, nextIndex);
289+
JavaUtils.attemptRepeatedly(() -> {
290+
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
291+
return null;
292+
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
293+
294+
// Clear all log files and reset cached log start index.
295+
long snapshotInstallIndex =
296+
leader.getRaftLog().onSnapshotInstalled(leader.getRaftLog().getLastCommittedIndex()).get();
297+
Assert.assertEquals(20, snapshotInstallIndex);
298+
299+
// Check that logs are gone.
300+
Assert.assertEquals(0,
301+
LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage()).size());
302+
Assert.assertEquals(RaftLog.INVALID_LOG_INDEX, leader.getRaftLog().getStartIndex());
303+
304+
// Allow some heartbeats to go through, then make sure none of them had
305+
// snapshot requests.
306+
ONE_SECOND.sleep();
307+
Assert.assertEquals(0, numSnapshotRequests.get());
308+
309+
// Make sure leader and followers are still up to date.
310+
for (RaftServer.Division follower : cluster.getFollowers()) {
311+
Assert.assertEquals(
312+
leader.getRaftLog().getNextIndex(),
313+
follower.getRaftLog().getNextIndex());
314+
}
315+
316+
// Add two more peers who will need snapshots from the leader.
317+
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
318+
// trigger setConfiguration
319+
cluster.setConfiguration(change.allPeersInNewConf);
320+
RaftServerTestUtil
321+
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
322+
323+
// Generate more data.
324+
try (final RaftClient client = cluster.createClient(leader.getId())) {
325+
Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" + i)).isSuccess());
326+
}
327+
328+
// Make sure leader and followers are still up to date.
329+
for (RaftServer.Division follower : cluster.getFollowers()) {
330+
Assert.assertEquals(
331+
leader.getRaftLog().getNextIndex(),
332+
follower.getRaftLog().getNextIndex());
333+
}
334+
335+
// Make sure each new peer got one snapshot notification.
336+
Assert.assertEquals(2, numSnapshotRequests.get());
337+
338+
} finally {
339+
cluster.shutdown();
340+
}
240341
}
241342
}

0 commit comments

Comments
 (0)