Skip to content

Commit 23ce17a

Browse files
Pipe/IoTV2: Persist progress index locally before shutdown to accurate recovery after restart (#15779) (#15887)
(cherry picked from commit e227a53)
1 parent 57d4157 commit 23ce17a

4 files changed

Lines changed: 47 additions & 14 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,30 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) {
817817
}
818818
}
819819

820+
///////////////////////// Shutdown Logic /////////////////////////
821+
822+
public void persistAllProgressIndexLocally() {
823+
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
824+
LOGGER.info(
825+
"Pipe progress index persist disabled. Skipping persist all progress index locally.");
826+
return;
827+
}
828+
if (!tryReadLockWithTimeOut(10)) {
829+
LOGGER.info("Failed to persist all progress index locally because of timeout.");
830+
return;
831+
}
832+
try {
833+
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
834+
pipeMeta.getRuntimeMeta().persistProgressIndex();
835+
}
836+
LOGGER.info("Persist all progress index locally successfully.");
837+
} catch (final Exception e) {
838+
LOGGER.warn("Failed to record all progress index locally, because {}.", e.getMessage(), e);
839+
} finally {
840+
releaseReadLock();
841+
}
842+
}
843+
820844
///////////////////////// Pipe Consensus /////////////////////////
821845

822846
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.consensus.exception.ConsensusException;
3030
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3131
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
32+
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
3233
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
3334
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
3435
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -86,6 +87,8 @@ public void run() {
8687
triggerSnapshotForAllDataRegion();
8788
}
8889

90+
// Persist progress index before shutdown to accurate recovery after restart
91+
PipeDataNodeAgent.task().persistAllProgressIndexLocally();
8992
// Shutdown pipe progressIndex background service
9093
PipePeriodicalJobExecutor.shutdownBackgroundService();
9194

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,15 @@ public void setIsStoppedByRuntimeException(boolean isStoppedByRuntimeException)
140140
this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
141141
}
142142

143+
public void persistProgressIndex() {
144+
// Iterate through all the task metas and persist their progress index
145+
for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
146+
if (taskMeta.getProgressIndex() != null) {
147+
taskMeta.persistProgressIndex();
148+
}
149+
}
150+
}
151+
143152
public ByteBuffer serialize() throws IOException {
144153
PublicBAOS byteArrayOutputStream = new PublicBAOS();
145154
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,34 +113,31 @@ public ProgressIndex getProgressIndex() {
113113
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
114114
// only pipeTaskMeta that need to updateProgressIndex will persist progress index
115115
// isRegisterPersistTask is used to avoid multiple threads registering persist task concurrently
116-
if (Objects.nonNull(progressIndexPersistFile)
117-
&& !isRegisterPersistTask.getAndSet(true)
116+
if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
118117
&& this.persistProgressIndexFuture == null
119-
&& PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
118+
&& !isRegisterPersistTask.getAndSet(true)) {
120119
this.persistProgressIndexFuture =
121120
PipePeriodicalJobExecutor.submitBackgroundJob(
122-
() -> {
123-
if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
124-
persistProgressIndex();
125-
}
126-
},
121+
this::persistProgressIndex,
127122
0,
128123
PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
129124
}
130125

131126
progressIndex.updateAndGet(
132127
index -> index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
133-
if (Objects.nonNull(progressIndexPersistFile)
134-
&& updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap
135-
&& PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
128+
129+
if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
130+
&& updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap) {
136131
persistProgressIndex();
137132
}
133+
138134
return progressIndex.get();
139135
}
140136

141-
private synchronized void persistProgressIndex() {
142-
if (lastPersistCount.get() == updateCount.get()) {
143-
// in case of multiple threads calling updateProgressIndex at the same time
137+
public synchronized void persistProgressIndex() {
138+
if (Objects.isNull(progressIndexPersistFile)
139+
// in case of multiple threads calling updateProgressIndex at the same time
140+
|| lastPersistCount.get() == updateCount.get()) {
144141
return;
145142
}
146143

0 commit comments

Comments
 (0)