From db7e928baac50cc764e13143a7a8614294b07a2e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 18:26:34 +0800 Subject: [PATCH 1/8] persist in shutdown hook --- .../confignode/manager/ConfigManager.java | 17 +++++++ .../iotdb/confignode/manager/IManager.java | 2 + .../thrift/ConfigNodeRPCServiceProcessor.java | 5 ++ .../agent/task/PipeDataNodeTaskAgent.java | 46 +++++++++++++++---- .../processor/PipeProcessorSubtask.java | 4 +- .../common/heartbeat/PipeHeartbeatEvent.java | 6 +-- .../PipeStatementInsertionEvent.java | 6 +-- .../PipeInsertNodeTabletInsertionEvent.java | 6 +-- .../tablet/PipeRawTabletInsertionEvent.java | 6 +-- .../tsfile/PipeTsFileInsertionEvent.java | 6 +-- .../dataregion/IoTDBDataRegionExtractor.java | 4 +- ...PipeRealtimeDataRegionHybridExtractor.java | 5 +- .../IoTDBSchemaRegionExtractor.java | 4 +- .../db/pipe/metric/PipeDataNodeMetrics.java | 6 +-- ...DataNodeRemainingEventAndTimeOperator.java | 18 +++++++- ...ava => PipeDataNodeSinglePipeMetrics.java} | 15 +++--- .../db/protocol/client/ConfigNodeClient.java | 7 +++ ...formationSchemaContentSupplierFactory.java | 4 +- .../config/sys/pipe/ShowPipeTask.java | 4 +- .../db/service/DataNodeShutdownHook.java | 25 +++++++++- .../iotdb/commons/conf/CommonConfig.java | 14 ++++++ .../iotdb/commons/pipe/config/PipeConfig.java | 5 ++ .../commons/pipe/config/PipeDescriptor.java | 5 ++ .../src/main/thrift/common.thrift | 7 +++ .../src/main/thrift/confignode.thrift | 3 ++ .../src/main/thrift/datanode.thrift | 9 +--- 26 files changed, 180 insertions(+), 59 deletions(-) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/{PipeDataNodeRemainingEventAndTimeMetrics.java => PipeDataNodeSinglePipeMetrics.java} (96%) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index a95b945a73c6b..e3b3de3cfcb2c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -2917,6 +2917,23 @@ public TFetchTableResp fetchTables(final Map> fetchTableMap) : new TFetchTableResp(status); } + @Override + public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) { + final TSStatus status = confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + pipeManager + .getPipeRuntimeCoordinator() + .parseHeartbeat( + dataNodeId, + resp.getPipeMetaList(), + resp.getPipeCompletedList(), + resp.getPipeRemainingEventCountList(), + resp.getPipeRemainingTimeList()); + return StatusUtils.OK; + } + @Override public DataSet registerAINode(TAINodeRegisterReq req) { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index db5cbe11e0054..60d777b135665 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -890,4 +890,6 @@ TDescTableResp describeTable( TDescTable4InformationSchemaResp describeTable4InformationSchema(); TFetchTableResp fetchTables(final Map> fetchTableMap); + + TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index af8e7e593b434..bc037c7e2212e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -1415,4 +1415,9 @@ public TDeleteTableDeviceResp deleteDevice(final TDeleteTableDeviceReq req) { public TSStatus createTableView(final TCreateTableViewReq req) { return configManager.createTableView(req); } + + @Override + public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) { + return configManager.pushHeartbeat(dataNodeId, resp); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 9d00d5a1206ce..f0057db22f25c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -49,10 +50,13 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode; import org.apache.iotdb.db.schemaengine.SchemaEngine; @@ -67,6 +71,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; @@ -322,13 +327,12 @@ private void closeSchemaRegionListeningQueueIfNecessary( @Override protected void thawRate(final String pipeName, final long creationTime) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime); + PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + creationTime); } @Override protected void freezeRate(final String pipeName, final long creationTime) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .freezeRate(pipeName + "_" + creationTime); + PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime); } @Override @@ -339,7 +343,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); + PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); return true; } @@ -367,7 +371,7 @@ protected boolean dropPipe(final String pipeName) { final long creationTime = pipeMeta.getStaticMeta().getCreationTime(); final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); + PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); // When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the // subscribed pipe, so the subscription needs to be manually marked as completed. if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) { @@ -461,7 +465,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; final Pair remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime()); pipeCompletedList.add(isCompleted); pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); @@ -491,7 +495,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro protected void collectPipeMetaListInternal( final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { // Do nothing if data node is removing or removed, or request does not need pipe meta list - if (PipeDataNodeAgent.runtime().isShutdown()) { + if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != Long.MIN_VALUE) { return; } LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId); @@ -533,7 +537,7 @@ protected void collectPipeMetaListInternal( final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; final Pair remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime()); pipeCompletedList.add(isCompleted); pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); @@ -842,7 +846,29 @@ private boolean isSnapshotMode(final PipeParameters parameters) { ///////////////////////// Shutdown Logic ///////////////////////// - public void persistAllProgressIndexLocally() { + public void persistAllProgressIndex() { + persistAllProgressIndexLocally(); + persistAllProgressIndex2ConfigNode(); + } + + private void persistAllProgressIndex2ConfigNode() { + try (final ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(); + collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp); + final TSStatus result = + configNodeClient.pushHeartbeat( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { + LOGGER.warn("Failed to persist progress index to configNode, status: {}", result); + } + } catch (final Exception e) { + LOGGER.warn(e.getMessage()); + } + } + + private void persistAllProgressIndexLocally() { if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { LOGGER.info( "Pipe progress index persist disabled. Skipping persist all progress index locally."); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 1f7262c0c1647..a574712cbbdcd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -167,7 +167,7 @@ protected boolean executeOnce() throws Exception { pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); } PipeProcessorMetrics.getInstance().markTsFileEvent(taskID); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .markTsFileCollectInvocationCount( pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof PipeHeartbeatEvent) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 1339611e7bfe8..8c3c29c315b92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.event.Event; @@ -93,7 +93,7 @@ public PipeHeartbeatEvent( @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseHeartbeatEventCount(pipeName, creationTime); } return true; @@ -104,7 +104,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa // PipeName == null indicates that the event is the raw event at disruptor, // not the event copied and passed to the extractor if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseHeartbeatEventCount(pipeName, creationTime); if (shouldPrintMessage && LOGGER.isDebugEnabled()) { LOGGER.debug(this.toString()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java index 964f186a30834..2befefc286369 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java @@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -89,7 +89,7 @@ public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlock, statement.ramBytesUsed() + INSTANCE_SIZE); if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseRawTabletEventCount(pipeName, creationTime); } return true; @@ -98,7 +98,7 @@ public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseRawTabletEventCount(pipeName, creationTime); } allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index b1518d3f4f1fb..c28dba1aec684 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; @@ -204,7 +204,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa try { PipeDataNodeResourceManager.wal().pin(walEntryHandler); if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseInsertNodeEventCount(pipeName, creationTime); PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, ramBytesUsed()); } @@ -240,7 +240,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa } finally { if (Objects.nonNull(pipeName)) { PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, ramBytesUsed()); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseInsertNodeEventCount(pipeName, creationTime, System.nanoTime() - extractTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 5f29402bd36d5..349bfc2f6b055 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; @@ -235,7 +235,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa allocatedMemoryBlock, PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + INSTANCE_SIZE); if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseRawTabletEventCount(pipeName, creationTime); } return true; @@ -244,7 +244,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseRawTabletEventCount(pipeName, creationTime); } allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index aa6e90054f855..f216ff510774f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -36,7 +36,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParserProvider; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; @@ -306,7 +306,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa return false; } finally { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseTsFileEventCount(pipeName, creationTime); } } @@ -330,7 +330,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa return false; } finally { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseTsFileEventCount(pipeName, creationTime, System.nanoTime() - extractTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 3ee2d655ec9b7..7154d901748ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -586,7 +586,7 @@ public void customize( // register metric after generating taskID PipeDataRegionExtractorMetrics.getInstance().register(this); PipeTsFileToTabletsMetrics.getInstance().register(this); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); + PipeDataNodeSinglePipeMetrics.getInstance().register(this); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 92e9ad64f5d64..ae6322376503c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -31,7 +31,7 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -238,8 +238,7 @@ private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) { private boolean mayRemainingInsertNodeEventExceedLimit(final PipeRealtimeEvent event) { final boolean mayRemainingInsertEventExceedLimit = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .mayRemainingInsertEventExceedLimit(pipeID); + PipeDataNodeSinglePipeMetrics.getInstance().mayRemainingInsertEventExceedLimit(pipeID); if (mayRemainingInsertEventExceedLimit && event.mayExtractorUseTablets(this)) { logByLogManager( l -> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java index 7f5b8df681117..0df7fb21d7311 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java @@ -36,7 +36,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionExtractorMetrics; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -98,7 +98,7 @@ public void customize( listenedTypeSet = SchemaRegionListeningFilter.parseListeningPlanTypeSet(parameters); PipeSchemaRegionExtractorMetrics.getInstance().register(this); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); + PipeDataNodeSinglePipeMetrics.getInstance().register(this); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java index 3f03ce580fd28..dd3873feaf46a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.pipe.metric; import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; @@ -53,7 +53,7 @@ public void bindTo(final AbstractMetricService metricService) { PipeSchemaRegionListenerMetrics.getInstance().bindTo(metricService); PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService); PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService); + PipeDataNodeSinglePipeMetrics.getInstance().bindTo(metricService); PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService); PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService); } @@ -71,7 +71,7 @@ public void unbindFrom(final AbstractMetricService metricService) { PipeSchemaRegionListenerMetrics.getInstance().unbindFrom(metricService); PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService); PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService); + PipeDataNodeSinglePipeMetrics.getInstance().unbindFrom(metricService); PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService); PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index 86368acf353fa..73d5828534584 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { +public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { // Calculate from schema region extractors directly for it requires less computation private final Set schemaRegionExtractors = @@ -107,6 +107,22 @@ void decreaseHeartbeatEventCount() { return insertNodeEventCountEMA.insertNodeEMAValue; } + public long getRemainingNonHeartbeatEvents() { + final long remainingEvents = + tsfileEventCount.get() + + rawTabletEventCount.get() + + insertNodeEventCount.get() + + schemaRegionExtractors.stream() + .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount) + .reduce(Long::sum) + .orElse(0L); + + // There are cases where the indicator is negative. For example, after the Pipe is restarted, + // the Processor SubTask is still collecting Events, resulting in a negative count. This + // situation cannot be avoided because the Pipe may be restarted internally. + return remainingEvents >= 0 ? remainingEvents : 0; + } + long getRemainingEvents() { final long remainingEvents = tsfileEventCount.get() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 354a980edfd2a..677d758a162a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -42,15 +42,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet { +public class PipeDataNodeSinglePipeMetrics implements IMetricSet { - private static final Logger LOGGER = - LoggerFactory.getLogger(PipeDataNodeRemainingEventAndTimeMetrics.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeSinglePipeMetrics.class); @SuppressWarnings("java:S3077") private volatile AbstractMetricService metricService; - private final Map + public final Map remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>(); private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM = @@ -381,19 +380,19 @@ public Pair getRemainingEventAndTime( private static class PipeDataNodeRemainingEventAndTimeMetricsHolder { - private static final PipeDataNodeRemainingEventAndTimeMetrics INSTANCE = - new PipeDataNodeRemainingEventAndTimeMetrics(); + private static final PipeDataNodeSinglePipeMetrics INSTANCE = + new PipeDataNodeSinglePipeMetrics(); private PipeDataNodeRemainingEventAndTimeMetricsHolder() { // Empty constructor } } - public static PipeDataNodeRemainingEventAndTimeMetrics getInstance() { + public static PipeDataNodeSinglePipeMetrics getInstance() { return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE; } - private PipeDataNodeRemainingEventAndTimeMetrics() { + private PipeDataNodeSinglePipeMetrics() { PipeEventCommitManager.getInstance().setCommitRateMarker(this::markRegionCommit); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 9d0dcd22f9125..17f80b3c1767f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -1418,6 +1418,13 @@ public TSStatus createTableView(TCreateTableViewReq req) throws TException { () -> client.createTableView(req), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) + throws TException { + return executeRemoteCallWithRetry( + () -> client.pushHeartbeat(dataNodeId, resp), status -> !updateConfigNodeLeader(status)); + } + public static class Factory extends ThriftClientFactory { public Factory( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index 85854bdb2dd53..b011a49b0771f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -59,7 +59,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -566,7 +566,7 @@ protected void constructLine() { if (remainingEventCount == -1 && remainingTime == -1) { final Pair remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(tPipeInfo.getId(), tPipeInfo.getCreationTime()); remainingEventCount = remainingEventAndTime.getLeft(); remainingTime = remainingEventAndTime.getRight(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java index 0dfd784e0063c..76b8680c378d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; @@ -105,7 +105,7 @@ public static void buildTSBlock( if (remainingEventCount == -1 && remainingTime == -1) { final Pair remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(tPipeInfo.getId(), tPipeInfo.getCreationTime()); remainingEventCount = remainingEventAndTime.getLeft(); remainingTime = remainingEventAndTime.getRight(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index efa628bbcbf46..5fc4f899c65af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -25,12 +25,15 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -44,6 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + public class DataNodeShutdownHook extends Thread { private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class); @@ -118,8 +123,26 @@ public void run() { triggerSnapshotForAllDataRegion(); } + long startTime = System.currentTimeMillis(); + if (PipeDataNodeAgent.task().getPipeCount() != 0) { + for (Map.Entry entry : + PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet()) { + while (entry.getValue().getRemainingNonHeartbeatEvents() > 0) { + if (System.currentTimeMillis() - startTime + > PipeConfig.getInstance().getPipeMaxWaitFinishTime()) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info("Interrupted when waiting for pipe to finish"); + } + } + } + } // Persist progress index before shutdown to accurate recovery after restart - PipeDataNodeAgent.task().persistAllProgressIndexLocally(); + PipeDataNodeAgent.task().persistAllProgressIndex(); // Shutdown all consensus pipe's receiver PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor(); // Shutdown pipe progressIndex background service diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 5d79b138067c2..b3a5543579098 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -228,6 +228,8 @@ public class CommonConfig { private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20; private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE; + private long pipeMaxWaitFinishTime = 10 * 1000; + private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B private int pipeExtractorMatcherCacheSize = 1024; @@ -1371,6 +1373,18 @@ public void setPipeSubtaskExecutorForcedRestartIntervalMs( pipeSubtaskExecutorForcedRestartIntervalMs); } + public long getPipeMaxWaitFinishTime() { + return pipeMaxWaitFinishTime; + } + + public void setPipeMaxWaitFinishTime(long pipeMaxWaitFinishTime) { + if (this.pipeMaxWaitFinishTime == pipeMaxWaitFinishTime) { + return; + } + this.pipeMaxWaitFinishTime = pipeMaxWaitFinishTime; + logger.info("pipeMaxWaitFinishTime is set to {}.", pipeMaxWaitFinishTime); + } + public int getPipeRealTimeQueuePollTsFileThreshold() { return pipeRealTimeQueuePollTsFileThreshold; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index dc2b4350960e9..439dfec55f452 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -147,6 +147,10 @@ public long getPipeSubtaskExecutorForcedRestartIntervalMs() { return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs(); } + public long getPipeMaxWaitFinishTime() { + return COMMON_CONFIG.getPipeMaxWaitFinishTime(); + } + /////////////////////////////// Extractor /////////////////////////////// public int getPipeExtractorAssignerDisruptorRingBufferSize() { @@ -523,6 +527,7 @@ public void printAllConfigs() { LOGGER.info( "PipeSubtaskExecutorForcedRestartIntervalMs: {}", getPipeSubtaskExecutorForcedRestartIntervalMs()); + LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime()); LOGGER.info( "PipeExtractorAssignerDisruptorRingBufferSize: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index b754a8581aa4d..db2ff49bbb05d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -596,6 +596,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr "pipe_threshold_allocation_strategy_high_usage_threshold", String.valueOf( config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold())))); + + config.setPipeMaxWaitFinishTime( + Long.parseLong( + properties.getProperty( + "pipe_max_wait_finish_time", String.valueOf(config.getPipeMaxWaitFinishTime())))); } public static void loadPipeExternalConfig( diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index c6f881f06cb10..3287f35b9bb92 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -196,6 +196,13 @@ struct TSetThrottleQuotaReq { 2: required TThrottleQuota throttleQuota } +struct TPipeHeartbeatResp { + 1: required list pipeMetaList + 2: optional list pipeCompletedList + 3: optional list pipeRemainingEventCountList + 4: optional list pipeRemainingTimeList +} + struct TLicense { 1: required i64 licenseIssueTimestamp 2: required i64 expireTimestamp diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 3ce020c731bb3..bf91b410459c4 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -2011,6 +2011,9 @@ service IConfigNodeRPCService { /** Get throttle quota information */ TThrottleQuotaResp getThrottleQuota() + /** Push heartbeat in shutdown */ + common.TSStatus pushHeartbeat(i32 dataNodeId, common.TPipeHeartbeatResp resp) + // ====================================================== // Table Or View // ====================================================== diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 1b8ad67de5cd8..64406b005d969 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -316,13 +316,6 @@ struct TPipeHeartbeatReq { 1: required i64 heartbeatId } -struct TPipeHeartbeatResp { - 1: required list pipeMetaList - 2: optional list pipeCompletedList - 3: optional list pipeRemainingEventCountList - 4: optional list pipeRemainingTimeList -} - enum TSchemaLimitLevel{ DEVICE, TIMESERIES @@ -1118,7 +1111,7 @@ service IDataNodeRPCService { /** * ConfigNode will ask DataNode for pipe meta in every few seconds **/ - TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) + common.TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) /** * Execute CQ on DataNode From c03495f0a9723b95598172af5ab6e6a29440a048 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 18:29:05 +0800 Subject: [PATCH 2/8] Update PipeTaskAgent.java --- .../org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index a4b9dfebe935b..c61f4dc95fd7b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; @@ -35,7 +36,6 @@ import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; From 23563e55ef66d5c5d30890fcbc26a1d224941480 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 18:29:56 +0800 Subject: [PATCH 3/8] Update PipeConfigNodeTaskAgent.java --- .../manager/pipe/agent/task/PipeConfigNodeTaskAgent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java index 0e90174a7f2c8..b115ba25b4adb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.manager.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; @@ -35,7 +36,6 @@ import org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics; import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.exception.PipeException; From ea25d9545c25bc791b9625dd8ac7ad035ce5b368 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 18:54:37 +0800 Subject: [PATCH 4/8] Update DataNodeShutdownHook.java --- .../iotdb/db/service/DataNodeShutdownHook.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 5fc4f899c65af..11faf42973294 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -127,9 +127,16 @@ public void run() { if (PipeDataNodeAgent.task().getPipeCount() != 0) { for (Map.Entry entry : PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet()) { - while (entry.getValue().getRemainingNonHeartbeatEvents() > 0) { + boolean timeout = false; + while (true) { + if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) { + logger.info( + "Successfully waited for pipe {} to finish.", entry.getValue().getPipeName()); + break; + } if (System.currentTimeMillis() - startTime > PipeConfig.getInstance().getPipeMaxWaitFinishTime()) { + timeout = true; break; } try { @@ -139,6 +146,10 @@ public void run() { logger.info("Interrupted when waiting for pipe to finish"); } } + if (timeout) { + logger.info("Timed out when waiting for pipes to finish, will break"); + break; + } } } // Persist progress index before shutdown to accurate recovery after restart From ac7e991a6991cbc822e7324567832dd542566377 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:02:08 +0800 Subject: [PATCH 5/8] Fix --- .../async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java | 2 +- .../client/async/handlers/rpc/PipeHeartbeatRPCHandler.java | 2 +- .../db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 85ce1253c13c9..46b9f44d8cd1a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.client.async.handlers.rpc; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.commons.client.request.AsyncRequestContext; @@ -33,7 +34,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp; import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewResp; import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java index 569424afbff43..e5fa157961d7e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java @@ -20,8 +20,8 @@ package org.apache.iotdb.confignode.client.async.handlers.rpc; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 82608ecf23aa7..226b26093edb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TLoadSample; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSender; import org.apache.iotdb.common.rpc.thrift.TServiceType; @@ -244,7 +245,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage; From 22504a2e99178bf689083bbeca9b48cf80ffa1cd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:02:37 +0800 Subject: [PATCH 6/8] Fix2 --- .../apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index f0057db22f25c..8eddc2987eab3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; @@ -67,7 +68,6 @@ import org.apache.iotdb.metrics.utils.SystemMetric; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; From e4abb75441b30d466ca5953021d0030d5b7385fd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:06:38 +0800 Subject: [PATCH 7/8] Fix3 --- .../java/org/apache/iotdb/confignode/manager/ConfigManager.java | 1 + .../main/java/org/apache/iotdb/confignode/manager/IManager.java | 1 + .../confignode/service/thrift/ConfigNodeRPCServiceProcessor.java | 1 + .../org/apache/iotdb/db/protocol/client/ConfigNodeClient.java | 1 + 4 files changed, 4 insertions(+) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index e3b3de3cfcb2c..fdc0ccaeb31e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSchemaNode; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 60d777b135665..d47d9375c5e3e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index bc037c7e2212e..e8be7f574c8b0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 17f80b3c1767f..8685b380b47ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; From 3e35979b61278bcc8f029b439380f1f281e16037 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:11:44 +0800 Subject: [PATCH 8/8] Update PipeHeartbeatScheduler.java --- .../coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java index 3533b40158d04..120ddb65f8621 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; @@ -31,7 +32,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory;