Skip to content

Commit 859c719

Browse files
authored
Wait for consensus start before answering region requests (#17546)
1 parent a15c733 commit 859c719

9 files changed

Lines changed: 663 additions & 8 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
5151
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
5252
import org.apache.iotdb.commons.cluster.NodeStatus;
53+
import org.apache.iotdb.commons.concurrent.Await;
54+
import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
5355
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
5456
import org.apache.iotdb.commons.concurrent.ThreadName;
5557
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
@@ -193,6 +195,7 @@
193195
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
194196
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
195197
import org.apache.iotdb.db.service.DataNode;
198+
import org.apache.iotdb.db.service.DataNode.DataNodeContext;
196199
import org.apache.iotdb.db.service.RegionMigrateService;
197200
import org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService;
198201
import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -416,6 +419,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
416419

417420
private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
418421

422+
private final DataNodeContext dataNodeContext;
423+
419424
private final ExecutorService schemaExecutor =
420425
new WrappedThreadPoolExecutor(
421426
0,
@@ -430,10 +435,33 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
430435

431436
private static final String SYSTEM = "system";
432437

433-
public DataNodeInternalRPCServiceImpl() {
438+
public DataNodeInternalRPCServiceImpl(DataNodeContext dataNodeContext) {
434439
super();
435440
partitionFetcher = ClusterPartitionFetcher.getInstance();
436441
schemaFetcher = ClusterSchemaFetcher.getInstance();
442+
this.dataNodeContext = dataNodeContext;
443+
}
444+
445+
private long consensusWaitTimeoutSeconds = 30;
446+
447+
private TSStatus waitForConsensusStarted() {
448+
if (dataNodeContext.isAllConsensusStarted()) {
449+
return null;
450+
}
451+
try {
452+
Await.await()
453+
.atMost(consensusWaitTimeoutSeconds, TimeUnit.SECONDS)
454+
.pollInterval(100, TimeUnit.MILLISECONDS)
455+
.until(dataNodeContext::isAllConsensusStarted);
456+
return null;
457+
} catch (AwaitTimeoutException e) {
458+
LOGGER.warn(
459+
"Consensus has not been started after {} seconds, rejecting region request",
460+
consensusWaitTimeoutSeconds);
461+
return RpcUtils.getStatus(
462+
TSStatusCode.CONSENSUS_NOT_INITIALIZED,
463+
"Consensus has not been started after " + consensusWaitTimeoutSeconds + " seconds");
464+
}
437465
}
438466

439467
@Override
@@ -624,11 +652,19 @@ private TLoadResp createTLoadResp(final TSStatus resultStatus) {
624652

625653
@Override
626654
public TSStatus createSchemaRegion(final TCreateSchemaRegionReq req) {
655+
TSStatus consensusStatus = waitForConsensusStarted();
656+
if (consensusStatus != null) {
657+
return consensusStatus;
658+
}
627659
return regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup());
628660
}
629661

630662
@Override
631663
public TSStatus createDataRegion(TCreateDataRegionReq req) {
664+
TSStatus consensusStatus = waitForConsensusStarted();
665+
if (consensusStatus != null) {
666+
return consensusStatus;
667+
}
632668
return regionManager.createDataRegion(req.getRegionReplicaSet(), req.getStorageGroup());
633669
}
634670

@@ -2616,6 +2652,10 @@ public TSStatus updateTemplate(final TUpdateTemplateReq req) {
26162652

26172653
@Override
26182654
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
2655+
TSStatus consensusStatus = waitForConsensusStarted();
2656+
if (consensusStatus != null) {
2657+
return consensusStatus;
2658+
}
26192659
ConsensusGroupId consensusGroupId =
26202660
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
26212661
if (consensusGroupId instanceof DataRegionId) {
@@ -2644,6 +2684,12 @@ public TRegionLeaderChangeResp changeRegionLeader(TRegionLeaderChangeReq req) {
26442684
LOGGER.info("[ChangeRegionLeader] {}", req);
26452685
TRegionLeaderChangeResp resp = new TRegionLeaderChangeResp();
26462686

2687+
TSStatus consensusStatus = waitForConsensusStarted();
2688+
if (consensusStatus != null) {
2689+
resp.setStatus(consensusStatus);
2690+
return resp;
2691+
}
2692+
26472693
TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
26482694
TConsensusGroupId tgId = req.getRegionId();
26492695
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
@@ -2713,6 +2759,10 @@ private boolean isLeader(ConsensusGroupId regionId) {
27132759

27142760
@Override
27152761
public TSStatus createNewRegionPeer(TCreatePeerReq req) {
2762+
TSStatus consensusStatus = waitForConsensusStarted();
2763+
if (consensusStatus != null) {
2764+
return consensusStatus;
2765+
}
27162766
ConsensusGroupId regionId =
27172767
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
27182768
List<Peer> peers =
@@ -2733,6 +2783,10 @@ public TSStatus createNewRegionPeer(TCreatePeerReq req) {
27332783

27342784
@Override
27352785
public TSStatus addRegionPeer(TMaintainPeerReq req) {
2786+
TSStatus consensusStatus = waitForConsensusStarted();
2787+
if (consensusStatus != null) {
2788+
return consensusStatus;
2789+
}
27362790
TConsensusGroupId regionId = req.getRegionId();
27372791
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
27382792
boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
@@ -2751,6 +2805,10 @@ public TSStatus addRegionPeer(TMaintainPeerReq req) {
27512805

27522806
@Override
27532807
public TSStatus removeRegionPeer(TMaintainPeerReq req) {
2808+
TSStatus consensusStatus = waitForConsensusStarted();
2809+
if (consensusStatus != null) {
2810+
return consensusStatus;
2811+
}
27542812
TConsensusGroupId regionId = req.getRegionId();
27552813
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
27562814
boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
@@ -2769,6 +2827,10 @@ public TSStatus removeRegionPeer(TMaintainPeerReq req) {
27692827

27702828
@Override
27712829
public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
2830+
TSStatus consensusStatus = waitForConsensusStarted();
2831+
if (consensusStatus != null) {
2832+
return consensusStatus;
2833+
}
27722834
TConsensusGroupId regionId = req.getRegionId();
27732835
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
27742836
boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
@@ -2788,6 +2850,10 @@ public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
27882850
// TODO: return which DataNode fail
27892851
@Override
27902852
public TSStatus resetPeerList(TResetPeerListReq req) throws TException {
2853+
TSStatus consensusStatus = waitForConsensusStarted();
2854+
if (consensusStatus != null) {
2855+
return consensusStatus;
2856+
}
27912857
return RegionMigrateService.getInstance().resetPeerList(req);
27922858
}
27932859

@@ -2798,6 +2864,10 @@ public TRegionMigrateResult getRegionMaintainResult(long taskId) throws TExcepti
27982864

27992865
@Override
28002866
public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws TException {
2867+
TSStatus consensusStatus = waitForConsensusStarted();
2868+
if (consensusStatus != null) {
2869+
return consensusStatus;
2870+
}
28012871
RegionMigrateService.getInstance().notifyRegionMigration(req);
28022872
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
28032873
}
@@ -3439,4 +3509,8 @@ private List<ByteBuffer> serializeDatabaseScopedTableList(
34393509

34403510
return result;
34413511
}
3512+
3513+
public void setConsensusWaitTimeoutSeconds(long consensusWaitTimeoutSeconds) {
3514+
this.consensusWaitTimeoutSeconds = consensusWaitTimeoutSeconds;
3515+
}
34423516
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,16 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean {
184184
private static final String REGISTER_INTERRUPTION =
185185
"Unexpected interruption when waiting to register to the cluster";
186186

187-
private boolean schemaRegionConsensusStarted = false;
188-
private boolean dataRegionConsensusStarted = false;
187+
private volatile boolean schemaRegionConsensusStarted = false;
188+
private volatile boolean dataRegionConsensusStarted = false;
189189
private static Thread watcherThread;
190+
private DataNodeContext context;
190191

191192
public DataNode() {
192193
super("DataNode");
193194
// We do not init anything here, so that we can re-initialize the instance in IT.
194195
DataNodeHolder.INSTANCE = this;
196+
context = new DataNodeContext();
195197
}
196198

197199
public static void reinitializeStatics() {
@@ -935,7 +937,9 @@ private void setUpRPCService() throws StartupException {
935937

936938
protected void registerInternalRPCService() throws StartupException {
937939
// Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
938-
registerManager.register(DataNodeInternalRPCService.getInstance());
940+
DataNodeInternalRPCService instance = DataNodeInternalRPCService.getInstance();
941+
instance.setDataNodeContext(context);
942+
registerManager.register(instance);
939943
}
940944

941945
// make it easier for users to extend ClientRPCServiceImpl to export more rpc services
@@ -1374,4 +1378,10 @@ private DataNodeHolder() {
13741378
// Empty constructor
13751379
}
13761380
}
1381+
1382+
public class DataNodeContext {
1383+
public boolean isAllConsensusStarted() {
1384+
return dataRegionConsensusStarted && schemaRegionConsensusStarted;
1385+
}
1386+
}
13771387
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3232
import org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler;
3333
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
34+
import org.apache.iotdb.db.service.DataNode.DataNodeContext;
3435
import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics;
3536
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor;
3637
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
@@ -44,6 +45,7 @@ public class DataNodeInternalRPCService extends ThriftService
4445
private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
4546

4647
private final AtomicReference<DataNodeInternalRPCServiceImpl> impl = new AtomicReference<>();
48+
private DataNodeContext dataNodeContext;
4749

4850
private DataNodeInternalRPCService() {}
4951

@@ -54,9 +56,9 @@ public ServiceType getID() {
5456

5557
@Override
5658
public void initTProcessor() {
57-
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
59+
DataNodeInternalRPCServiceImpl service = getImpl();
5860
initSyncedServiceImpl(null);
59-
processor = new Processor<>(impl.get());
61+
processor = new Processor<>(service);
6062
}
6163

6264
@Override
@@ -109,7 +111,7 @@ public int getBindPort() {
109111
}
110112

111113
public DataNodeInternalRPCServiceImpl getImpl() {
112-
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
114+
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl(dataNodeContext));
113115
return impl.get();
114116
}
115117

@@ -122,4 +124,8 @@ private DataNodeInternalRPCServiceHolder() {}
122124
public static DataNodeInternalRPCService getInstance() {
123125
return DataNodeInternalRPCServiceHolder.INSTANCE;
124126
}
127+
128+
public void setDataNodeContext(DataNodeContext dataNodeContext) {
129+
this.dataNodeContext = dataNodeContext;
130+
}
125131
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.protocol.thrift.impl;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24+
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
25+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
26+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
27+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
28+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
29+
import org.apache.iotdb.db.service.DataNode.DataNodeContext;
30+
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
31+
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
32+
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
33+
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
34+
import org.apache.iotdb.rpc.TSStatusCode;
35+
36+
import org.junit.Assert;
37+
import org.junit.BeforeClass;
38+
import org.junit.Test;
39+
import org.mockito.Mockito;
40+
41+
import java.util.Collections;
42+
43+
import static org.mockito.Mockito.when;
44+
45+
public class ConsensusWaitTest {
46+
47+
@BeforeClass
48+
public static void setUp() {
49+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
50+
}
51+
52+
private DataNodeInternalRPCServiceImpl createServiceWithConsensusState(boolean started) {
53+
DataNodeContext context = Mockito.mock(DataNodeContext.class);
54+
when(context.isAllConsensusStarted()).thenReturn(started);
55+
DataNodeInternalRPCServiceImpl service = new DataNodeInternalRPCServiceImpl(context);
56+
service.setConsensusWaitTimeoutSeconds(1);
57+
return service;
58+
}
59+
60+
private TCreateSchemaRegionReq createSchemaRegionReq() {
61+
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
62+
req.setStorageGroup("root.test");
63+
TRegionReplicaSet replicaSet = new TRegionReplicaSet();
64+
replicaSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0));
65+
TDataNodeLocation location = new TDataNodeLocation();
66+
location.setDataNodeId(0);
67+
location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
68+
location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
69+
location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
70+
location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
71+
location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
72+
replicaSet.setDataNodeLocations(Collections.singletonList(location));
73+
req.setRegionReplicaSet(replicaSet);
74+
return req;
75+
}
76+
77+
private TCreateDataRegionReq createDataRegionReq() {
78+
TCreateDataRegionReq req = new TCreateDataRegionReq();
79+
req.setStorageGroup("root.test");
80+
TRegionReplicaSet replicaSet = new TRegionReplicaSet();
81+
replicaSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
82+
TDataNodeLocation location = new TDataNodeLocation();
83+
location.setDataNodeId(0);
84+
location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
85+
location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
86+
location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
87+
location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
88+
location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
89+
replicaSet.setDataNodeLocations(Collections.singletonList(location));
90+
req.setRegionReplicaSet(replicaSet);
91+
return req;
92+
}
93+
94+
@Test
95+
public void testCreateSchemaRegionRejectsWhenConsensusNotStarted() {
96+
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
97+
TSStatus status = service.createSchemaRegion(createSchemaRegionReq());
98+
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode());
99+
}
100+
101+
@Test
102+
public void testCreateDataRegionRejectsWhenConsensusNotStarted() {
103+
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
104+
TSStatus status = service.createDataRegion(createDataRegionReq());
105+
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode());
106+
}
107+
108+
@Test
109+
public void testDeleteRegionRejectsWhenConsensusNotStarted() {
110+
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
111+
TConsensusGroupId groupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
112+
TSStatus status = service.deleteRegion(groupId);
113+
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode());
114+
}
115+
116+
@Test
117+
public void testChangeRegionLeaderRejectsWhenConsensusNotStarted() {
118+
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
119+
TRegionLeaderChangeReq req = new TRegionLeaderChangeReq();
120+
req.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
121+
TDataNodeLocation newLeader = new TDataNodeLocation();
122+
newLeader.setDataNodeId(0);
123+
newLeader.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
124+
newLeader.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
125+
newLeader.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
126+
req.setNewLeaderNode(newLeader);
127+
TRegionLeaderChangeResp resp = service.changeRegionLeader(req);
128+
Assert.assertEquals(
129+
TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), resp.getStatus().getCode());
130+
}
131+
}

0 commit comments

Comments
 (0)