Skip to content

Commit ae14450

Browse files
authored
[To dev/1.3] Revert Pipe: Try to persist progressIndex in local for resend event more efficently (#15599) (#15926)
* Revert "[To dev/1.3] Pipe/IoTV2: Try to persist progressIndex in local for resend event more efficently (#15599) (#15669)" This reverts commit 59974f4. * Update DataNodeShutdownHook.java * continue revert * fix
1 parent 6751b02 commit ae14450

22 files changed

Lines changed: 48 additions & 407 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
244244
.equals(PipeStatus.STOPPED)) {
245245
PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
246246
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
247-
runtimeMeta.onSetPipeDroppedOrStopped();
248247
runtimeMeta.setIsStoppedByRuntimeException(true);
249248

250249
needWriteConsensusOnConfigNodes.set(true);
@@ -274,7 +273,6 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
274273
exceptionMap.put(nodeId, exception);
275274
}
276275
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
277-
runtimeMeta.onSetPipeDroppedOrStopped();
278276
runtimeMeta.setIsStoppedByRuntimeException(true);
279277

280278
needWriteConsensusOnConfigNodes.set(true);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -616,11 +616,7 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla
616616
if (newLeader != -1) {
617617
consensusGroupIdToTaskMetaMap.put(
618618
consensusGroupId.getId(),
619-
new PipeTaskMeta(
620-
MinimumProgressIndex.INSTANCE,
621-
newLeader,
622-
consensusGroupId.getId(),
623-
false));
619+
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
624620
}
625621
// else:
626622
// "The pipe task meta does not contain the data region group {} or
@@ -794,7 +790,6 @@ private boolean recordDataNodePushPipeMetaExceptionsInternal(
794790

795791
// Mark the status of the pipe with exception as stopped
796792
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
797-
runtimeMeta.onSetPipeDroppedOrStopped();
798793
runtimeMeta.setIsStoppedByRuntimeException(true);
799794

800795
final Map<Integer, PipeRuntimeException> exceptionMap =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
155155
// Pipe only collect user's data, filter metric database here.
156156
updatedConsensusGroupIdToTaskMetaMap.put(
157157
regionGroupId.getId(),
158-
new PipeTaskMeta(
159-
currentPipeTaskMeta.getProgressIndex(),
160-
regionLeaderNodeId,
161-
regionGroupId.getId(),
162-
false));
158+
new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), regionLeaderNodeId));
163159
}
164160
});
165161

@@ -174,9 +170,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
174170
new PipeTaskMeta(
175171
configRegionTaskMeta.getProgressIndex(),
176172
// The leader of the config region is the config node itself
177-
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
178-
Integer.MIN_VALUE,
179-
false));
173+
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
180174
}
181175

182176
updatedPipeRuntimeMeta = new PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
171171
groupId.getId(),
172172
new PipeTaskMeta(
173173
new RecoverProgressIndex(senderDataNodeId, new SimpleProgressIndex(0, 0)),
174-
senderDataNodeId,
175-
groupId.getId(),
176-
false));
174+
senderDataNodeId));
177175
} else {
178176
// data regions & schema regions
179177
env.getConfigManager()
@@ -189,11 +187,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
189187
// Pipe only collect user's data, filter out metric database here.
190188
consensusGroupIdToTaskMetaMap.put(
191189
regionGroupId.getId(),
192-
new PipeTaskMeta(
193-
MinimumProgressIndex.INSTANCE,
194-
regionLeaderNodeId,
195-
regionGroupId.getId(),
196-
false));
190+
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
197191
}
198192
});
199193

@@ -206,9 +200,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
206200
new PipeTaskMeta(
207201
MinimumProgressIndex.INSTANCE,
208202
// The leader of the config region is the config node itself
209-
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
210-
Integer.MIN_VALUE,
211-
false));
203+
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
212204
}
213205

214206
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2525
import org.apache.iotdb.commons.cluster.NodeStatus;
2626
import org.apache.iotdb.commons.conf.CommonDescriptor;
27-
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
2827
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
2928
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
3029
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -88,8 +87,6 @@ public void run() {
8887
"Reporting ConfigNode shutdown failed. The cluster will still take the current ConfigNode as Running for a few seconds.");
8988
}
9089
}
91-
// Shutdown pipe progressIndex background service
92-
PipePeriodicalJobExecutor.shutdownBackgroundService();
9390

9491
if (LOGGER.isInfoEnabled()) {
9592
LOGGER.info(

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ public void CreatePipePlanV2Test() throws IOException {
849849
extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor");
850850
processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
851851
connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocol.ThriftTransporter");
852-
final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
852+
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
853853
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
854854
pipeTasks.put(1, pipeTaskMeta);
855855
PipeStaticMeta pipeStaticMeta =
@@ -874,8 +874,8 @@ public void AlterPipePlanV2Test() throws IOException {
874874
extractorAttributes.put("pattern", "root.db");
875875
processorAttributes.put("processor", "do-nothing-processor");
876876
connectorAttributes.put("batch.enable", "false");
877-
final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
878-
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
877+
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
878+
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
879879
pipeTasks.put(1, pipeTaskMeta);
880880
PipeStaticMeta pipeStaticMeta =
881881
new PipeStaticMeta(
@@ -912,8 +912,8 @@ public void DropPipePlanV2Test() throws IOException {
912912

913913
@Test
914914
public void OperateMultiplePipesPlanV2Test() throws IOException {
915-
final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
916-
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
915+
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
916+
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
917917
pipeTasks.put(1, pipeTaskMeta);
918918
PipeStaticMeta pipeStaticMeta =
919919
new PipeStaticMeta(
@@ -925,8 +925,8 @@ public void OperateMultiplePipesPlanV2Test() throws IOException {
925925
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
926926
CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
927927

928-
final PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2, 2, false);
929-
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new ConcurrentHashMap<>();
928+
PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
929+
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new ConcurrentHashMap<>();
930930
pipeTasks.put(2, pipeTaskMeta1);
931931
PipeStaticMeta pipeStaticMeta1 =
932932
new PipeStaticMeta(
@@ -1024,8 +1024,8 @@ public void pipeHandleMetaChangePlanTest() throws IOException {
10241024
new PipeRuntimeMeta(
10251025
new ConcurrentHashMap<Integer, PipeTaskMeta>() {
10261026
{
1027-
put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987, 1, false));
1028-
put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789, 1, false));
1027+
put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987));
1028+
put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789));
10291029
}
10301030
});
10311031
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public PipeTableResp constructPipeTableResp() {
5454
connectorAttributes.put("host", "127.0.0.1");
5555
connectorAttributes.put("port", "6667");
5656

57-
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
57+
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
5858
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
5959
pipeTasks.put(1, pipeTaskMeta);
6060
PipeStaticMeta pipeStaticMeta =
@@ -74,7 +74,7 @@ public PipeTableResp constructPipeTableResp() {
7474
connectorAttributes1.put("host", "127.0.0.1");
7575
connectorAttributes1.put("port", "6667");
7676

77-
PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
77+
PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
7878
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new ConcurrentHashMap<>();
7979
pipeTasks1.put(1, pipeTaskMeta1);
8080
PipeStaticMeta pipeStaticMeta1 =
@@ -94,7 +94,7 @@ public PipeTableResp constructPipeTableResp() {
9494
connectorAttributes2.put("host", "172.30.30.30");
9595
connectorAttributes2.put("port", "6667");
9696

97-
PipeTaskMeta pipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
97+
PipeTaskMeta pipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
9898
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks2 = new ConcurrentHashMap<>();
9999
pipeTasks2.put(1, pipeTaskMeta2);
100100
PipeStaticMeta pipeStaticMeta2 =

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ public void setUp() throws Exception {
6262
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName());
6363
}
6464
},
65-
new PipeTaskMeta(
66-
MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE, Integer.MIN_VALUE, false)));
65+
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE)));
6766
}
6867

6968
@After

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void testSnapshot() throws TException, IOException {
8686
connectorAttributes.put("host", "127.0.0.1");
8787
connectorAttributes.put("port", "6667");
8888

89-
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
89+
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
9090
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
9191
pipeTasks.put(1, pipeTaskMeta);
9292
PipeStaticMeta pipeStaticMeta =
@@ -121,7 +121,7 @@ public void testManagement() {
121121
extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor");
122122
processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
123123
connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocol.ThriftTransporter");
124-
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
124+
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
125125
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
126126
pipeTasks.put(1, pipeTaskMeta);
127127
PipeStaticMeta pipeStaticMeta =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -825,11 +825,6 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) {
825825
///////////////////////// Shutdown Logic /////////////////////////
826826

827827
public void persistAllProgressIndex() {
828-
persistAllProgressIndexLocally();
829-
persistAllProgressIndex2ConfigNode();
830-
}
831-
832-
private void persistAllProgressIndex2ConfigNode() {
833828
try (final ConfigNodeClient configNodeClient =
834829
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
835830
// Send request to some API server
@@ -848,28 +843,6 @@ private void persistAllProgressIndex2ConfigNode() {
848843
}
849844
}
850845

851-
private void persistAllProgressIndexLocally() {
852-
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
853-
LOGGER.info(
854-
"Pipe progress index persist disabled. Skipping persist all progress index locally.");
855-
return;
856-
}
857-
if (!tryReadLockWithTimeOut(10)) {
858-
LOGGER.info("Failed to persist all progress index locally because of timeout.");
859-
return;
860-
}
861-
try {
862-
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
863-
pipeMeta.getRuntimeMeta().persistProgressIndex();
864-
}
865-
LOGGER.info("Persist all progress index locally successfully.");
866-
} catch (final Exception e) {
867-
LOGGER.warn("Failed to record all progress index locally, because {}.", e.getMessage(), e);
868-
} finally {
869-
releaseReadLock();
870-
}
871-
}
872-
873846
///////////////////////// Pipe Consensus /////////////////////////
874847

875848
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) {

0 commit comments

Comments
 (0)