Skip to content

Commit b7eb193

Browse files
authored
Manually trigger repair data partition (#17530)
1 parent b15f706 commit b7eb193

22 files changed

Lines changed: 391 additions & 47 deletions

File tree

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.it.partition;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.ClusterIT;
25+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
26+
27+
import org.junit.After;
28+
import org.junit.Assert;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
import org.junit.experimental.categories.Category;
32+
import org.junit.runner.RunWith;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.sql.Connection;
37+
import java.sql.SQLException;
38+
import java.sql.Statement;
39+
import java.util.ArrayList;
40+
import java.util.Collections;
41+
import java.util.List;
42+
import java.util.concurrent.CountDownLatch;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
48+
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
49+
50+
@RunWith(IoTDBTestRunner.class)
51+
@Category({LocalStandaloneIT.class, ClusterIT.class})
52+
public class DataPartitionTableIntegrityCheckProcedureIT {
53+
private static final Logger LOGGER =
54+
LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedureIT.class);
55+
56+
@Before
57+
public void setUp() {
58+
EnvFactory.getEnv()
59+
.getConfig()
60+
.getCommonConfig()
61+
.setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS)
62+
.setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
63+
.setDataRegionConsensusProtocolClass(RATIS_CONSENSUS)
64+
.setDataReplicationFactor(1);
65+
EnvFactory.getEnv().initClusterEnvironment(1, 1);
66+
}
67+
68+
@After
69+
public void tearDown() throws Exception {
70+
EnvFactory.getEnv().cleanClusterEnvironment();
71+
}
72+
73+
@Test
74+
public void testConcurrentSubmitDataPartitionTableIntegrityCheckProcedure()
75+
throws InterruptedException {
76+
final int threadCount = 10;
77+
final CountDownLatch startLatch = new CountDownLatch(1);
78+
final CountDownLatch finishLatch = new CountDownLatch(threadCount);
79+
final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
80+
81+
final AtomicInteger successCount = new AtomicInteger(0);
82+
final AtomicInteger failCount = new AtomicInteger(0);
83+
final List<String> failureMessages = Collections.synchronizedList(new ArrayList<>());
84+
85+
// Concurrently submit the DataPartitionTableIntegrityCheckProcedure
86+
for (int i = 0; i < threadCount; i++) {
87+
final int threadId = i;
88+
executor.submit(
89+
() -> {
90+
try {
91+
startLatch.await();
92+
93+
try (final Connection connection = EnvFactory.getEnv().getConnection();
94+
final Statement stmt = connection.createStatement()) {
95+
stmt.execute("REPAIR DATA PARTITION TABLE");
96+
successCount.incrementAndGet();
97+
LOGGER.info("Thread {} submitted integrity check successfully", threadId);
98+
}
99+
} catch (final SQLException e) {
100+
failCount.incrementAndGet();
101+
failureMessages.add("Thread " + threadId + " failed: " + e.getMessage());
102+
LOGGER.info(
103+
"Thread {} failed to submit integrity check: {}", threadId, e.getMessage());
104+
} catch (final Exception e) {
105+
failCount.incrementAndGet();
106+
failureMessages.add("Thread " + threadId + " failed unexpectedly: " + e.getMessage());
107+
LOGGER.error("Thread {} unexpected error: {}", threadId, e.getMessage(), e);
108+
} finally {
109+
finishLatch.countDown();
110+
}
111+
});
112+
}
113+
114+
startLatch.countDown();
115+
116+
final boolean completed = finishLatch.await(60, TimeUnit.SECONDS);
117+
Assert.assertTrue("Not all threads completed within timeout", completed);
118+
119+
executor.shutdown();
120+
Assert.assertTrue(
121+
"Executor did not terminate", executor.awaitTermination(10, TimeUnit.SECONDS));
122+
123+
LOGGER.info("Success count: {}, Fail count: {}", successCount.get(), failCount.get());
124+
LOGGER.info("Failure messages: {}", failureMessages);
125+
126+
Assert.assertEquals(
127+
"Only one procedure should be submitted successfully", 1, successCount.get());
128+
Assert.assertEquals(
129+
"The other concurrent submissions should be rejected", threadCount - 1, failCount.get());
130+
}
131+
}

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ utilityStatement
9494
| showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding
9595
| revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile
9696
| removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser
97+
| repairDataPartitionTable
9798
;
9899

99100
/**
@@ -1238,6 +1239,11 @@ stopRepairData
12381239
: STOP REPAIR DATA (ON (LOCAL | CLUSTER))?
12391240
;
12401241

1242+
// Repair Data Partition Table
1243+
repairDataPartitionTable
1244+
: REPAIR DATA PARTITION TABLE
1245+
;
1246+
12411247
// Explain
12421248
explain
12431249
: EXPLAIN (ANALYZE VERBOSE?)? selectStatement?

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,6 @@ public class ConfigNodeConfig {
320320

321321
private long forceWalPeriodForConfigNodeSimpleInMs = 100;
322322

323-
private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000;
324-
325323
public ConfigNodeConfig() {
326324
// empty constructor
327325
}
@@ -1290,13 +1288,4 @@ public long getFailureDetectorPhiAcceptablePauseInMs() {
12901288
public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) {
12911289
this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs;
12921290
}
1293-
1294-
public long getPartitionTableRecoverWaitAllDnUpTimeoutInMs() {
1295-
return partitionTableRecoverWaitAllDnUpTimeoutInMs;
1296-
}
1297-
1298-
public void setPartitionTableRecoverWaitAllDnUpTimeoutInMs(
1299-
long partitionTableRecoverWaitAllDnUpTimeoutInMs) {
1300-
this.partitionTableRecoverWaitAllDnUpTimeoutInMs = partitionTableRecoverWaitAllDnUpTimeoutInMs;
1301-
}
13021291
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -322,23 +322,6 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
322322
"failure_detector_phi_acceptable_pause_in_ms",
323323
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
324324

325-
long partitionTableRecoverWaitAllDnUpTimeoutInMs =
326-
Long.parseLong(
327-
properties.getProperty(
328-
"partition_table_recover_wait_all_dn_up_timeout_ms",
329-
String.valueOf(conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs())));
330-
if (partitionTableRecoverWaitAllDnUpTimeoutInMs <= 0) {
331-
LOGGER.warn(
332-
"partition_table_recover_wait_all_dn_up_timeout_ms should be greater than 0, "
333-
+ "but current value is {}, ignore that and use the default value {}",
334-
partitionTableRecoverWaitAllDnUpTimeoutInMs,
335-
conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
336-
partitionTableRecoverWaitAllDnUpTimeoutInMs =
337-
conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs();
338-
}
339-
conf.setPartitionTableRecoverWaitAllDnUpTimeoutInMs(
340-
partitionTableRecoverWaitAllDnUpTimeoutInMs);
341-
342325
String leaderDistributionPolicy =
343326
properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy());
344327
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,16 @@ public TDataPartitionTableResp getOrCreateDataPartition(
11571157
return resp;
11581158
}
11591159

1160+
@Override
1161+
public TSStatus dataPartitionTableIntegrityCheck() {
1162+
TSStatus status = confirmLeader();
1163+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
1164+
return status;
1165+
}
1166+
1167+
return partitionManager.dataPartitionTableIntegrityCheck();
1168+
}
1169+
11601170
private void printNewCreatedDataPartition(
11611171
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
11621172
final String lineSeparator = System.lineSeparator();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,8 @@ TSchemaNodeManagementResp getNodePathsPartition(
477477
TDataPartitionTableResp getOrCreateDataPartition(
478478
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan);
479479

480+
TSStatus dataPartitionTableIntegrityCheck();
481+
480482
/**
481483
* Get AuditLogger.
482484
*

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
7070
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
7171
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
72-
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
7372
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
7473
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
7574
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
@@ -1376,16 +1375,6 @@ public TSStatus createRegionGroups(
13761375
}
13771376
}
13781377

1379-
/** Used to repair the lost data partition table */
1380-
public TSStatus dataPartitionTableIntegrityCheck() {
1381-
DataPartitionTableIntegrityCheckProcedure procedure;
1382-
synchronized (this) {
1383-
procedure = new DataPartitionTableIntegrityCheckProcedure();
1384-
executor.submitProcedure(procedure);
1385-
}
1386-
return waitingProcedureFinished(procedure, 86400000);
1387-
}
1388-
13891378
/**
13901379
* Generate {@link CreateTriggerProcedure} and wait until it finished.
13911380
*

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
8484
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
8585
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
86+
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
8687
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
8788
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
8889
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -155,6 +156,9 @@ public class PartitionManager {
155156
private final ScheduledExecutorService regionMaintainer;
156157
private Future<?> currentRegionMaintainerFuture;
157158

159+
private final AtomicBoolean dataPartitionTableIntegrityCheckProcedureRunning =
160+
new AtomicBoolean(false);
161+
158162
public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
159163
this.configManager = configManager;
160164
this.partitionInfo = partitionInfo;
@@ -514,6 +518,29 @@ public DataPartitionResp getOrCreateDataPartition(final GetOrCreateDataPartition
514518
return resp;
515519
}
516520

521+
/** Used to repair the lost data partition table */
522+
public TSStatus dataPartitionTableIntegrityCheck() {
523+
if (configManager
524+
.getProcedureManager()
525+
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)
526+
|| !dataPartitionTableIntegrityCheckProcedureRunning.compareAndSet(false, true)) {
527+
return RpcUtils.getStatus(
528+
TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
529+
"DataPartitionTableIntegrityCheckProcedure is already submitted.");
530+
}
531+
532+
synchronized (this) {
533+
DataPartitionTableIntegrityCheckProcedure procedure =
534+
new DataPartitionTableIntegrityCheckProcedure();
535+
getProcedureManager().getExecutor().submitProcedure(procedure);
536+
}
537+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
538+
}
539+
540+
public void markDataPartitionTableIntegrityCheckProcedureFinished() {
541+
dataPartitionTableIntegrityCheckProcedureRunning.set(false);
542+
}
543+
517544
private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
518545
TSStatus status = getConsensusManager().confirmLeader();
519546
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,15 @@ public DataPartitionTableIntegrityCheckProcedure() {
131131
super();
132132
}
133133

134+
@Override
135+
protected void updateMetricsOnFinish(
136+
final ConfigNodeProcedureEnv env, final long runtime, final boolean success) {
137+
super.updateMetricsOnFinish(env, runtime, success);
138+
env.getConfigManager()
139+
.getPartitionManager()
140+
.markDataPartitionTableIntegrityCheckProcedureFinished();
141+
}
142+
134143
@Override
135144
protected Flow executeFromState(
136145
final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state)

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,11 @@ public TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq r
626626
return configManager.getOrCreateDataPartition(getOrCreateDataPartitionReq);
627627
}
628628

629+
@Override
630+
public TSStatus dataPartitionTableIntegrityCheck() {
631+
return configManager.dataPartitionTableIntegrityCheck();
632+
}
633+
629634
@Override
630635
public TSStatus operatePermission(final TAuthorizerReq req) {
631636
ConfigPhysicalPlanType configPhysicalPlanType =

0 commit comments

Comments
 (0)