Skip to content

Commit e8683d5

Browse files
authored
[To dev/1.3] Pipe: Enabled waiting for pipes to finish & progress index persist to config node in shutdown hook (#15896)(#15901)
* Pipe: Enabled waiting for pipes to finish & progress index persist to config node in shutdown hook (#15896) * persist in shutdown hook * Update PipeTaskAgent.java * Update PipeConfigNodeTaskAgent.java * Update DataNodeShutdownHook.java * Fix * Fix2 * Fix3 * Update PipeHeartbeatScheduler.java * Update ShowPipeTask.java
1 parent 2d0e1d3 commit e8683d5

30 files changed

Lines changed: 197 additions & 61 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.client.async.handlers.rpc;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2324
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2425
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
2526
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
@@ -32,7 +33,6 @@
3233
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
3334
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
3435
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
35-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
3636
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
3737
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
3838
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
package org.apache.iotdb.confignode.client.async.handlers.rpc;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2324
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
24-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
2525

2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
2727
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2828
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
29+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2930
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
3031
import org.apache.iotdb.common.rpc.thrift.TSStatus;
3132
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
@@ -2562,6 +2563,23 @@ public TThrottleQuotaResp getThrottleQuota() {
25622563
: new TThrottleQuotaResp(status);
25632564
}
25642565

2566+
@Override
2567+
public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) {
2568+
final TSStatus status = confirmLeader();
2569+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
2570+
return status;
2571+
}
2572+
pipeManager
2573+
.getPipeRuntimeCoordinator()
2574+
.parseHeartbeat(
2575+
dataNodeId,
2576+
resp.getPipeMetaList(),
2577+
resp.getPipeCompletedList(),
2578+
resp.getPipeRemainingEventCountList(),
2579+
resp.getPipeRemainingTimeList());
2580+
return StatusUtils.OK;
2581+
}
2582+
25652583
@Override
25662584
public DataSet registerAINode(TAINodeRegisterReq req) {
25672585
TSStatus status = confirmLeader();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2424
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
25+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2526
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2627
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
2728
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
@@ -823,4 +824,6 @@ TDataPartitionTableResp getOrCreateDataPartition(
823824

824825
/** Set space quota. */
825826
TSStatus setSpaceQuota(TSetSpaceQuotaReq req);
827+
828+
TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp);
826829
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.confignode.manager.pipe.agent.task;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2223
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2324
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
2425
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -35,7 +36,6 @@
3536
import org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics;
3637
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
3738
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
38-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
3939
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
4040
import org.apache.iotdb.pipe.api.exception.PipeException;
4141

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2324
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
2425
import org.apache.iotdb.commons.concurrent.ThreadName;
2526
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -31,7 +32,6 @@
3132
import org.apache.iotdb.confignode.manager.ConfigManager;
3233
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
3334
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
34-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
3535

3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2424
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
2525
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
26+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2627
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2728
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
2829
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
@@ -1298,4 +1299,9 @@ public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) {
12981299
public TThrottleQuotaResp getThrottleQuota() {
12991300
return configManager.getThrottleQuota();
13001301
}
1302+
1303+
@Override
1304+
public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) {
1305+
return configManager.pushHeartbeat(dataNodeId, resp);
1306+
}
13011307
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.pipe.agent.task;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
23+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2224
import org.apache.iotdb.commons.consensus.DataRegionId;
2325
import org.apache.iotdb.commons.consensus.SchemaRegionId;
2426
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -48,11 +50,14 @@
4850
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
4951
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
5052
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
51-
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
53+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
5254
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
5355
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
5456
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
5557
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
58+
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
59+
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
60+
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
5661
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
5762
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
5863
import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -63,10 +68,10 @@
6368
import org.apache.iotdb.metrics.utils.SystemMetric;
6469
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
6570
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
66-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
6771
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
6872
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
6973
import org.apache.iotdb.pipe.api.exception.PipeException;
74+
import org.apache.iotdb.rpc.TSStatusCode;
7075

7176
import com.google.common.collect.ImmutableMap;
7277
import org.apache.thrift.TException;
@@ -306,13 +311,12 @@ private void closeSchemaRegionListeningQueueIfNecessary(
306311

307312
@Override
308313
protected void thawRate(final String pipeName, final long creationTime) {
309-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
314+
PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
310315
}
311316

312317
@Override
313318
protected void freezeRate(final String pipeName, final long creationTime) {
314-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
315-
.freezeRate(pipeName + "_" + creationTime);
319+
PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime);
316320
}
317321

318322
@Override
@@ -323,7 +327,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) {
323327

324328
final String taskId = pipeName + "_" + creationTime;
325329
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
326-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
330+
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
327331

328332
return true;
329333
}
@@ -351,7 +355,7 @@ protected boolean dropPipe(final String pipeName) {
351355
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
352356
final String taskId = pipeName + "_" + creationTime;
353357
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
354-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
358+
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
355359
// When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the
356360
// subscribed pipe, so the subscription needs to be manually marked as completed.
357361
if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
@@ -445,7 +449,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro
445449

446450
final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
447451
final Pair<Long, Double> remainingEventAndTime =
448-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
452+
PipeDataNodeSinglePipeMetrics.getInstance()
449453
.getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
450454
pipeCompletedList.add(isCompleted);
451455
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -475,7 +479,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro
475479
protected void collectPipeMetaListInternal(
476480
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException {
477481
// Do nothing if data node is removing or removed, or request does not need pipe meta list
478-
if (PipeDataNodeAgent.runtime().isShutdown()) {
482+
if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != Long.MIN_VALUE) {
479483
return;
480484
}
481485
LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
@@ -528,7 +532,7 @@ protected void collectPipeMetaListInternal(
528532

529533
final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
530534
final Pair<Long, Double> remainingEventAndTime =
531-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
535+
PipeDataNodeSinglePipeMetrics.getInstance()
532536
.getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
533537
pipeCompletedList.add(isCompleted);
534538
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -819,7 +823,29 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) {
819823

820824
///////////////////////// Shutdown Logic /////////////////////////
821825

822-
public void persistAllProgressIndexLocally() {
826+
public void persistAllProgressIndex() {
827+
persistAllProgressIndexLocally();
828+
persistAllProgressIndex2ConfigNode();
829+
}
830+
831+
private void persistAllProgressIndex2ConfigNode() {
832+
try (final ConfigNodeClient configNodeClient =
833+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
834+
// Send request to some API server
835+
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
836+
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
837+
final TSStatus result =
838+
configNodeClient.pushHeartbeat(
839+
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
840+
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
841+
LOGGER.warn("Failed to persist progress index to configNode, status: {}", result);
842+
}
843+
} catch (final Exception e) {
844+
LOGGER.warn(e.getMessage());
845+
}
846+
}
847+
848+
private void persistAllProgressIndexLocally() {
823849
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
824850
LOGGER.info(
825851
"Pipe progress index persist disabled. Skipping persist all progress index locally.");

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
3232
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
3333
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
34-
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
34+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
3535
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
3636
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
3737
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -140,7 +140,7 @@ protected boolean executeOnce() throws Exception {
140140
} else if (event instanceof TsFileInsertionEvent) {
141141
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
142142
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
143-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
143+
PipeDataNodeSinglePipeMetrics.getInstance()
144144
.markTsFileCollectInvocationCount(
145145
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
146146
} else if (event instanceof PipeHeartbeatEvent) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2626
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2727
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
28-
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
28+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
2929
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
3030
import org.apache.iotdb.db.utils.DateTimeUtils;
3131
import org.apache.iotdb.pipe.api.event.Event;
@@ -83,7 +83,7 @@ public PipeHeartbeatEvent(
8383
@Override
8484
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
8585
if (Objects.nonNull(pipeName)) {
86-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
86+
PipeDataNodeSinglePipeMetrics.getInstance()
8787
.increaseHeartbeatEventCount(pipeName, creationTime);
8888
}
8989
return true;
@@ -94,7 +94,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
9494
// PipeName == null indicates that the event is the raw event at disruptor,
9595
// not the event copied and passed to the extractor
9696
if (Objects.nonNull(pipeName)) {
97-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
97+
PipeDataNodeSinglePipeMetrics.getInstance()
9898
.decreaseHeartbeatEventCount(pipeName, creationTime);
9999
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
100100
LOGGER.debug(this.toString());

0 commit comments

Comments
 (0)