Skip to content

Commit a37621b

Browse files
authored
Fix duplicate peer in createNewRegionPeer; harden reconstruct IT
1 parent 4ac0b7e commit a37621b

3 files changed

Lines changed: 42 additions & 9 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,4 +736,23 @@ protected static Map<Integer, String> getRegionStatusWithoutRunning(Session sess
736736
}
737737
return result;
738738
}
739+
740+
/** Returns regionId -> dataNodeId -> status as reported by {@code show regions}. */
741+
protected static Map<Integer, Map<Integer, String>> getRegionStatusMap(Session session)
742+
throws IoTDBConnectionException, StatementExecutionException {
743+
SessionDataSet dataSet = session.executeQueryStatement("show regions");
744+
final int regionIdIndex = dataSet.getColumnNames().indexOf("RegionId");
745+
final int dataNodeIdIndex = dataSet.getColumnNames().indexOf("DataNodeId");
746+
final int regionStatusIndex = dataSet.getColumnNames().indexOf("Status");
747+
dataSet.setFetchSize(1024);
748+
Map<Integer, Map<Integer, String>> result = new TreeMap<>();
749+
while (dataSet.hasNext()) {
750+
List<Field> fields = dataSet.next().getFields();
751+
final int regionId = fields.get(regionIdIndex).getIntV();
752+
final int dataNodeId = fields.get(dataNodeIdIndex).getIntV();
753+
final String regionStatus = fields.get(regionStatusIndex).toString();
754+
result.computeIfAbsent(regionId, k -> new TreeMap<>()).put(dataNodeId, regionStatus);
755+
}
756+
return result;
757+
}
739758
}

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBRegionReconstructForIoTV1IT.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.io.File;
4747
import java.sql.Connection;
4848
import java.sql.Statement;
49+
import java.util.Collections;
4950
import java.util.Map;
5051
import java.util.Set;
5152
import java.util.concurrent.TimeUnit;
@@ -108,7 +109,9 @@ public void normal1C3DTest() throws Exception {
108109
Assert.assertTrue(dataRegionMap.containsKey(selectedRegion));
109110
Pair<Integer, Set<Integer>> leaderAndNodeIds = dataRegionMap.get(selectedRegion);
110111
Assert.assertEquals(2, leaderAndNodeIds.right.size());
111-
// reconstruct from the leader to ensure no data is lost
112+
// Stop the current leader; reconstruct will later target the surviving follower (whose
113+
// tsfiles get deleted below). When the original leader is restarted, the follower's
114+
// missing data is replicated back from it, so no committed data is lost.
112115
final int dataNodeToBeClosed = leaderAndNodeIds.left;
113116
final int dataNodeToBeReconstructed =
114117
leaderAndNodeIds.right.stream().filter(x -> x != dataNodeToBeClosed).findAny().get();
@@ -172,19 +175,25 @@ public void normal1C3DTest() throws Exception {
172175
EnvFactory.getAbstractEnv().checkNodeInStatus(dataNodeToBeClosed, NodeStatus.Running);
173176
session.executeNonQueryStatement(
174177
String.format(RECONSTRUCT_FORMAT, selectedRegion, dataNodeToBeReconstructed));
178+
// Confirm reconstruct succeeded: the selected region must contain both the reconstructed
179+
// peer and the (formerly closed) peer, with both rows reporting Running.
175180
try {
176181
Awaitility.await()
177182
.pollInterval(1, TimeUnit.SECONDS)
178183
.atMost(10, TimeUnit.MINUTES)
179184
.until(
180-
() ->
181-
getRegionStatusWithoutRunning(session).isEmpty()
182-
&& dataDirToBeReconstructed.getAbsoluteFile().exists());
185+
() -> {
186+
Map<Integer, String> peerStatus =
187+
getRegionStatusMap(session)
188+
.getOrDefault(selectedRegion, Collections.emptyMap());
189+
return "Running".equals(peerStatus.get(dataNodeToBeReconstructed))
190+
&& "Running".equals(peerStatus.get(dataNodeToBeClosed));
191+
});
183192
} catch (Exception e) {
184193
LOGGER.error(
185-
"Two factor: {} && {}",
186-
getRegionStatusWithoutRunning(session),
187-
dataDirToBeReconstructed.getAbsoluteFile().exists());
194+
"Reconstruct did not finish in time. region {} status map: {}",
195+
selectedRegion,
196+
getRegionStatusMap(session).get(selectedRegion));
188197
fail();
189198
}
190199
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeToBeClosed).get().stopForcibly();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,14 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio
175175
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
176176
&& (IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())
177177
|| IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass()))) {
178-
// parameter of createPeer for MultiLeader should be all peers
178+
// parameter of createPeer for MultiLeader should be all peers; callers (e.g.
179+
// AddRegionPeerProcedure) may have already inserted destDataNode into the partition
180+
// table before reaching here, so append only when not already present to avoid
181+
// sending a peer list with duplicates.
179182
currentPeerNodes = new ArrayList<>(regionReplicaNodes);
180-
currentPeerNodes.add(destDataNode);
183+
if (!currentPeerNodes.contains(destDataNode)) {
184+
currentPeerNodes.add(destDataNode);
185+
}
181186
} else {
182187
// parameter of createPeer for Ratis can be empty
183188
currentPeerNodes = Collections.emptyList();

0 commit comments

Comments
 (0)