Skip to content

Commit bfafd9b

Browse files
committed
test: added new raft rejoin test
1 parent 9fd36f8 commit bfafd9b

1 file changed

Lines changed: 250 additions & 0 deletions

File tree

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
* Copyright 2021-present Arcade Data Ltd (info@arcadedata.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com)
17+
* SPDX-License-Identifier: Apache-2.0
18+
*/
19+
package com.arcadedb.server.ha.raft;
20+
21+
import com.arcadedb.ContextConfiguration;
22+
import com.arcadedb.GlobalConfiguration;
23+
import com.arcadedb.graph.MutableVertex;
24+
import com.arcadedb.log.LogManager;
25+
import com.arcadedb.server.ArcadeDBServer;
26+
import com.arcadedb.utility.CodeUtils;
27+
import org.junit.jupiter.api.Tag;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.logging.Level;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
34+
/**
35+
* Regression test for issue #4081.
36+
* <p>
37+
* Reproduces the user's reported scenario: a 4-node cluster where node 0 has the
38+
* highest leader-election priority and node 3 is a replica. After:
39+
* <ol>
40+
* <li>Killing the priority leader (node 0)</li>
41+
* <li>Restarting it (Raft storage is wiped because {@code raftPersistStorage=false})</li>
42+
* <li>Letting it become leader again via priority</li>
43+
* <li>Stopping the replica (node 3)</li>
44+
* <li>Restarting the replica (Raft storage is wiped)</li>
45+
* </ol>
46+
* the leader must not get stuck in an INCONSISTENCY/append-entries loop. The cluster must
47+
* eventually converge: writes succeed and the rejoined replica catches up.
48+
*
49+
* @author Luca Garulli (l.garulli@arcadedata.com)
50+
*/
51+
@Tag("slow")
52+
class RaftPriorityRejoinIT extends BaseRaftHATest {
53+
54+
private static final int BASE_RAFT_PORT = 2434;
55+
private static final int BASE_HTTP_PORT = 2480;
56+
57+
@Override
58+
protected int getServerCount() {
59+
return 4;
60+
}
61+
62+
@Override
63+
protected boolean persistentRaftStorage() {
64+
// The user's setup uses default (false). Storage is wiped on every restart.
65+
return false;
66+
}
67+
68+
@Override
69+
protected String getServerAddresses() {
70+
// Mirror the user's configuration: priorities 10/8/6/0, last node is replica role.
71+
final int[] priorities = { 10, 8, 6, 0 };
72+
final StringBuilder sb = new StringBuilder();
73+
for (int i = 0; i < getServerCount(); i++) {
74+
if (i > 0)
75+
sb.append(",");
76+
sb.append("localhost:").append(BASE_RAFT_PORT + i)
77+
.append(":").append(BASE_HTTP_PORT + i)
78+
.append(":").append(priorities[i]);
79+
}
80+
return sb.toString();
81+
}
82+
83+
@Override
84+
protected void onServerConfiguration(final ContextConfiguration config) {
85+
super.onServerConfiguration(config);
86+
config.setValue(GlobalConfiguration.HA_QUORUM, "majority");
87+
// Mirror the user's adb*.sh: quorumTimeout=1000ms.
88+
config.setValue(GlobalConfiguration.HA_QUORUM_TIMEOUT, 1_000L);
89+
90+
// Tag the last server as replica role (mirrors the user's adb4.sh).
91+
final String serverName = config.getValueAsString(GlobalConfiguration.SERVER_NAME);
92+
final int idx = Integer.parseInt(serverName.substring(serverName.lastIndexOf('_') + 1));
93+
if (idx == getServerCount() - 1)
94+
config.setValue(GlobalConfiguration.HA_SERVER_ROLE, "replica");
95+
96+
// Keep election timeouts small so the test runs in reasonable time.
97+
config.setValue(GlobalConfiguration.HA_ELECTION_TIMEOUT_MIN, 1_000);
98+
config.setValue(GlobalConfiguration.HA_ELECTION_TIMEOUT_MAX, 2_000);
99+
}
100+
101+
@Test
102+
void leaderRestartThenReplicaRestartConverges() {
103+
// Step 1: write some data with all nodes online.
104+
final int firstLeader = findLeaderIndex();
105+
assertThat(firstLeader).as("a Raft leader must be elected at startup").isGreaterThanOrEqualTo(0);
106+
107+
final var leaderDb = getServerDatabase(firstLeader, getDatabaseName());
108+
leaderDb.transaction(() -> {
109+
if (!leaderDb.getSchema().existsType("Issue4081"))
110+
leaderDb.getSchema().createVertexType("Issue4081");
111+
});
112+
113+
// Many small transactions so the Raft log has plenty of entries to replicate.
114+
for (int t = 0; t < 50; t++) {
115+
final int base = t * 10;
116+
leaderDb.transaction(() -> {
117+
for (int i = 0; i < 10; i++) {
118+
final MutableVertex v = leaderDb.newVertex("Issue4081");
119+
v.set("phase", "initial");
120+
v.set("idx", base + i);
121+
v.save();
122+
}
123+
});
124+
}
125+
126+
assertClusterConsistency();
127+
128+
// Step 2: kill the priority-leader (server 0).
129+
LogManager.instance().log(this, Level.INFO, "TEST: stopping priority leader (server 0)");
130+
getServer(0).stop();
131+
while (getServer(0).getStatus() == ArcadeDBServer.STATUS.SHUTTING_DOWN)
132+
CodeUtils.sleep(200);
133+
134+
// Step 3: wait for a new leader among the remaining nodes.
135+
final int interimLeader = waitForAnyLeader(0);
136+
assertThat(interimLeader).as("a new leader must be elected after stopping server 0").isGreaterThanOrEqualTo(0);
137+
assertThat(interimLeader).isNotEqualTo(0);
138+
139+
// Write more so the interim leader's log advances beyond the killed leader's snapshot point.
140+
final var interimDb = getServerDatabase(interimLeader, getDatabaseName());
141+
for (int t = 0; t < 30; t++) {
142+
final int base = t * 10;
143+
interimDb.transaction(() -> {
144+
for (int i = 0; i < 10; i++) {
145+
final MutableVertex v = interimDb.newVertex("Issue4081");
146+
v.set("phase", "after-leader-stop");
147+
v.set("idx", base + i);
148+
v.save();
149+
}
150+
});
151+
}
152+
153+
// Step 4: restart server 0. Its Raft storage was wiped (persistStorage=false) so it joins fresh.
154+
LogManager.instance().log(this, Level.INFO, "TEST: restarting server 0");
155+
getServer(0).start();
156+
157+
// Step 5: with priority 10, server 0 should become the leader again. Wait for that.
158+
final int reLeader = waitForLeader(0, 60_000);
159+
assertThat(reLeader).as("server 0 should reclaim leadership via priority").isEqualTo(0);
160+
161+
// Confirm the cluster is functional with server 0 as leader.
162+
final var reLeaderDb = getServerDatabase(0, getDatabaseName());
163+
for (int t = 0; t < 30; t++) {
164+
final int base = t * 10;
165+
reLeaderDb.transaction(() -> {
166+
for (int i = 0; i < 10; i++) {
167+
final MutableVertex v = reLeaderDb.newVertex("Issue4081");
168+
v.set("phase", "after-leader-rejoin");
169+
v.set("idx", base + i);
170+
v.save();
171+
}
172+
});
173+
}
174+
175+
// Step 6: stop the replica (server 3).
176+
LogManager.instance().log(this, Level.INFO, "TEST: stopping replica (server 3)");
177+
getServer(3).stop();
178+
while (getServer(3).getStatus() == ArcadeDBServer.STATUS.SHUTTING_DOWN)
179+
CodeUtils.sleep(200);
180+
181+
// Write more so the leader's log advances while the replica is offline.
182+
// Aim for enough entries to cross the snapshot threshold so the leader's log gets purged.
183+
for (int t = 0; t < 30; t++) {
184+
final int base = t * 10;
185+
reLeaderDb.transaction(() -> {
186+
for (int i = 0; i < 10; i++) {
187+
final MutableVertex v = reLeaderDb.newVertex("Issue4081");
188+
v.set("phase", "while-replica-down");
189+
v.set("idx", base + i);
190+
v.save();
191+
}
192+
});
193+
}
194+
195+
// Step 7: restart the replica. Storage is wiped so it joins as a fresh peer.
196+
LogManager.instance().log(this, Level.INFO, "TEST: restarting replica (server 3)");
197+
getServer(3).start();
198+
199+
// Step 8: the replica must catch up. The leader must NOT get stuck in an INCONSISTENCY loop.
200+
waitForReplicationIsCompleted(3);
201+
202+
// Final write to confirm the cluster is fully operational.
203+
for (int t = 0; t < 30; t++) {
204+
final int base = t * 10;
205+
reLeaderDb.transaction(() -> {
206+
for (int i = 0; i < 10; i++) {
207+
final MutableVertex v = reLeaderDb.newVertex("Issue4081");
208+
v.set("phase", "final");
209+
v.set("idx", base + i);
210+
v.save();
211+
}
212+
});
213+
}
214+
215+
assertClusterConsistency();
216+
217+
// Every node must hold all 1700 vertices (50+30+30+30+30 transactions x 10 vertices each).
218+
final long expected = (50 + 30 + 30 + 30 + 30) * 10L;
219+
for (int i = 0; i < getServerCount(); i++) {
220+
final long count = getServerDatabase(i, getDatabaseName()).countType("Issue4081", true);
221+
assertThat(count).as("server " + i + " should have all " + expected + " records").isEqualTo(expected);
222+
}
223+
}
224+
225+
private int waitForAnyLeader(final int excludeIndex) {
226+
final long deadline = System.currentTimeMillis() + 30_000;
227+
while (System.currentTimeMillis() < deadline) {
228+
for (int i = 0; i < getServerCount(); i++) {
229+
if (i == excludeIndex)
230+
continue;
231+
final RaftHAPlugin plugin = getRaftPlugin(i);
232+
if (plugin != null && plugin.isLeader())
233+
return i;
234+
}
235+
CodeUtils.sleep(250);
236+
}
237+
return -1;
238+
}
239+
240+
private int waitForLeader(final int expectedIndex, final long timeoutMs) {
241+
final long deadline = System.currentTimeMillis() + timeoutMs;
242+
while (System.currentTimeMillis() < deadline) {
243+
final RaftHAPlugin plugin = getRaftPlugin(expectedIndex);
244+
if (plugin != null && plugin.isLeader())
245+
return expectedIndex;
246+
CodeUtils.sleep(500);
247+
}
248+
return -1;
249+
}
250+
}

0 commit comments

Comments
 (0)