Skip to content

Commit df2a302

Browse files
authored
RATIS-2499. Allow the LogAppender restart when LogAppenderDaemon exception (#1425)
1 parent 32e7925 commit df2a302

4 files changed

Lines changed: 79 additions & 3 deletions

File tree

ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,11 @@ public void start() {
125125
@Override
126126
public boolean isRunning() {
127127
return daemon.isWorking()
128-
&& server.getInfo().isAlive()
128+
&& isLeaderAlive();
129+
}
130+
131+
private boolean isLeaderAlive() {
132+
return server.getInfo().isAlive()
129133
&& server.getInfo().isLeader()
130134
&& getRaftLog().isOpened();
131135
}
@@ -136,8 +140,12 @@ public CompletableFuture<LifeCycle.State> stopAsync() {
136140
}
137141

138142
void restart() {
139-
if (!isRunning()) {
140-
LOG.warn("{} is not running: skipping restart", this);
143+
if (daemon.isClosingOrClosed()) {
144+
LOG.warn("{}: daemon is closing or closed, skipping restart", this);
145+
return;
146+
}
147+
if (!isLeaderAlive()) {
148+
LOG.warn("{}: leader is not ready, skipping restart", this);
141149
return;
142150
}
143151
getLeaderState().restart(this);

ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public boolean isWorking() {
5858
return !LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
5959
}
6060

61+
public boolean isClosingOrClosed() {
62+
return LifeCycle.States.CLOSING_OR_CLOSED.contains(lifeCycle.getCurrentState());
63+
}
64+
6165
public void tryToStart() {
6266
if (lifeCycle.compareAndTransition(NEW, STARTING)) {
6367
daemon.start();

ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ public void testFollowerHeartbeatMetric() throws IOException, InterruptedExcepti
171171
assertTrue(t.getTimer().getCount() > 0L);
172172
}
173173
}
174+
cluster.shutdown();
174175
}
175176

176177
void runTest(CLUSTER cluster) throws Exception {

ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
package org.apache.ratis.grpc;
1919

2020
import org.apache.ratis.LogAppenderTests;
21+
import org.apache.ratis.grpc.server.GrpcServicesImpl;
2122
import org.apache.ratis.proto.RaftProtos;
23+
import org.apache.ratis.protocol.RaftGroupId;
24+
import org.apache.ratis.protocol.RaftPeerId;
2225
import org.apache.ratis.server.impl.MiniRaftCluster;
2326
import org.apache.ratis.RaftTestUtil;
2427
import org.apache.ratis.client.RaftClient;
@@ -29,11 +32,14 @@
2932
import org.apache.ratis.server.RaftServerConfigKeys;
3033
import org.apache.ratis.server.leader.FollowerInfo;
3134
import org.apache.ratis.server.impl.RaftServerTestUtil;
35+
import org.apache.ratis.server.leader.LogAppender;
3236
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
3337
import org.apache.ratis.statemachine.StateMachine;
38+
import org.apache.ratis.util.CodeInjectionForTesting;
3439
import org.apache.ratis.util.JavaUtils;
3540
import org.apache.ratis.util.Slf4jUtils;
3641
import org.junit.jupiter.api.Assertions;
42+
import org.junit.jupiter.api.Test;
3743
import org.junit.jupiter.params.ParameterizedTest;
3844
import org.junit.jupiter.params.provider.MethodSource;
3945
import org.slf4j.event.Level;
@@ -42,7 +48,10 @@
4248
import java.util.ArrayList;
4349
import java.util.Arrays;
4450
import java.util.Collection;
51+
import java.util.Set;
4552
import java.util.concurrent.CompletableFuture;
53+
import java.util.concurrent.atomic.AtomicInteger;
54+
import java.util.stream.Collectors;
4655

4756
import static org.apache.ratis.RaftTestUtil.waitForLeader;
4857

@@ -148,4 +157,58 @@ private void runTestRestartLogAppender(MiniRaftClusterWithGrpc cluster) throws E
148157
Assertions.assertTrue(newleaderMetrics.getRegistry().counter(counter).getCount() >= 1L);
149158
}
150159
}
160+
161+
@Test
162+
public void testLogAppenderAutoRestartOnException() throws Exception {
163+
runWithNewCluster(3, this::runTestAutoRestartOnException);
164+
}
165+
166+
private void runTestAutoRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exception {
167+
final RaftServer.Division leader = waitForLeader(cluster);
168+
final RaftPeerId leaderId = leader.getId();
169+
170+
try (RaftClient client = cluster.createClient(leaderId)) {
171+
for (int i = 0; i < 5; i++) {
172+
Assertions.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("init-" + i)).isSuccess());
173+
}
174+
}
175+
176+
final Set<LogAppender> before = RaftServerTestUtil.getLogAppenders(leader).collect(Collectors.toSet());
177+
Assertions.assertEquals(2, before.size());
178+
179+
// Inject a one-time IllegalStateException into the leader's AppendEntries send path.
180+
// This causes the LogAppenderDaemon to enter EXCEPTION state and call restart().
181+
final RaftGroupId groupId = cluster.getGroupId();
182+
final AtomicInteger failCount = new AtomicInteger(0);
183+
try {
184+
CodeInjectionForTesting.put(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST, (localId, remoteId, args) -> {
185+
if (leaderId.equals(localId)
186+
&& args.length > 0 && args[0] instanceof RaftProtos.AppendEntriesRequestProto) {
187+
final RaftProtos.AppendEntriesRequestProto proto = (RaftProtos.AppendEntriesRequestProto) args[0];
188+
if (RaftGroupId.valueOf(proto.getServerRequest().getRaftGroupId().getId()).equals(groupId)
189+
&& failCount.getAndIncrement() < 1) {
190+
throw new IllegalStateException("Injected failure for restart test");
191+
}
192+
}
193+
return false;
194+
});
195+
196+
JavaUtils.attempt(() -> {
197+
final Set<LogAppender> current = RaftServerTestUtil.getLogAppenders(leader)
198+
.collect(Collectors.toSet());
199+
Assertions.assertEquals(2, current.size());
200+
Assertions.assertTrue(current.stream().anyMatch(a -> !before.contains(a)),
201+
"Expected at least one new LogAppender instance after daemon exception restart");
202+
}, 30, ONE_SECOND, "LogAppender auto-restart after exception", LOG);
203+
} finally {
204+
CodeInjectionForTesting.remove(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST);
205+
}
206+
207+
try (RaftClient client = cluster.createClient(leaderId)) {
208+
for (int i = 0; i < 5; i++) {
209+
Assertions.assertTrue(
210+
client.io().send(new RaftTestUtil.SimpleMessage("after-restart-" + i)).isSuccess());
211+
}
212+
}
213+
}
151214
}

0 commit comments

Comments
 (0)