Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.client.async.handlers.rpc;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
Expand All @@ -33,7 +34,6 @@
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.iotdb.confignode.client.async.handlers.rpc;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
Expand Down Expand Up @@ -2917,6 +2918,23 @@ public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap)
: new TFetchTableResp(status);
}

@Override
public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) {
final TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
pipeManager
.getPipeRuntimeCoordinator()
.parseHeartbeat(
dataNodeId,
resp.getPipeMetaList(),
resp.getPipeCompletedList(),
resp.getPipeRemainingEventCountList(),
resp.getPipeRemainingTimeList());
return StatusUtils.OK;
}

@Override
public DataSet registerAINode(TAINodeRegisterReq req) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
Expand Down Expand Up @@ -890,4 +891,6 @@ TDescTableResp describeTable(
TDescTable4InformationSchemaResp describeTable4InformationSchema();

TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);

TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.confignode.manager.pipe.agent.task;

import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
Expand All @@ -35,7 +36,6 @@
import org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics;
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.exception.PipeException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
Expand All @@ -31,7 +32,6 @@
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
Expand Down Expand Up @@ -1415,4 +1416,9 @@ public TDeleteTableDeviceResp deleteDevice(final TDeleteTableDeviceReq req) {
public TSStatus createTableView(final TCreateTableViewReq req) {
return configManager.createTableView(req);
}

@Override
public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) {
return configManager.pushHeartbeat(dataNodeId, resp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.pipe.agent.task;

import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
Expand Down Expand Up @@ -49,10 +51,13 @@
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
Expand All @@ -63,10 +68,10 @@
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

import com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -322,13 +327,12 @@ private void closeSchemaRegionListeningQueueIfNecessary(

@Override
protected void thawRate(final String pipeName, final long creationTime) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
}

@Override
protected void freezeRate(final String pipeName, final long creationTime) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.freezeRate(pipeName + "_" + creationTime);
PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime);
}

@Override
Expand All @@ -339,7 +343,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) {

final String taskId = pipeName + "_" + creationTime;
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);

return true;
}
Expand Down Expand Up @@ -367,7 +371,7 @@ protected boolean dropPipe(final String pipeName) {
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
final String taskId = pipeName + "_" + creationTime;
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
// When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the
// subscribed pipe, so the subscription needs to be manually marked as completed.
if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
Expand Down Expand Up @@ -461,7 +465,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro

final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
pipeCompletedList.add(isCompleted);
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
Expand Down Expand Up @@ -491,7 +495,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro
protected void collectPipeMetaListInternal(
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException {
// Do nothing if data node is removing or removed, or request does not need pipe meta list
if (PipeDataNodeAgent.runtime().isShutdown()) {
if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != Long.MIN_VALUE) {
return;
}
LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
Expand Down Expand Up @@ -533,7 +537,7 @@ protected void collectPipeMetaListInternal(

final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
pipeCompletedList.add(isCompleted);
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
Expand Down Expand Up @@ -842,7 +846,29 @@ private boolean isSnapshotMode(final PipeParameters parameters) {

///////////////////////// Shutdown Logic /////////////////////////

public void persistAllProgressIndexLocally() {
public void persistAllProgressIndex() {
persistAllProgressIndexLocally();
persistAllProgressIndex2ConfigNode();
}

private void persistAllProgressIndex2ConfigNode() {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
final TSStatus result =
configNodeClient.pushHeartbeat(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
LOGGER.warn("Failed to persist progress index to configNode, status: {}", result);
}
} catch (final Exception e) {
LOGGER.warn(e.getMessage());
}
}

private void persistAllProgressIndexLocally() {
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
LOGGER.info(
"Pipe progress index persist disabled. Skipping persist all progress index locally.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
Expand Down Expand Up @@ -167,7 +167,7 @@ protected boolean executeOnce() throws Exception {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
}
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.markTsFileCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
Expand Down Expand Up @@ -93,7 +93,7 @@ public PipeHeartbeatEvent(
@Override
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.increaseHeartbeatEventCount(pipeName, creationTime);
}
return true;
Expand All @@ -104,7 +104,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseHeartbeatEventCount(pipeName, creationTime);
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
Expand Down Expand Up @@ -89,7 +89,7 @@ public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlock, statement.ramBytesUsed() + INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.increaseRawTabletEventCount(pipeName, creationTime);
}
return true;
Expand All @@ -98,7 +98,7 @@ public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseRawTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser;
import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
Expand Down Expand Up @@ -204,7 +204,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
try {
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.increaseInsertNodeEventCount(pipeName, creationTime);
PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, ramBytesUsed());
}
Expand Down Expand Up @@ -240,7 +240,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, ramBytesUsed());
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseInsertNodeEventCount(pipeName, creationTime, System.nanoTime() - extractTime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
Expand Down Expand Up @@ -235,7 +235,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
allocatedMemoryBlock,
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.increaseRawTabletEventCount(pipeName, creationTime);
}
return true;
Expand All @@ -244,7 +244,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseRawTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
Expand Down
Loading
Loading