Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void setUp() {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMetaSyncerInitialSyncDelayMinutes(1)
.setPipeMetaSyncerSyncIntervalMinutes(1)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
Expand All @@ -85,6 +87,8 @@ public void setUp() {
.setSchemaReplicationFactor(3)
.setDataReplicationFactor(2)
.setDnConnectionTimeoutMs(600000)
.setPipeMetaSyncerInitialSyncDelayMinutes(1)
.setPipeMetaSyncerSyncIntervalMinutes(1)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);

Expand Down Expand Up @@ -184,8 +188,12 @@ public void testSourcePermission() {
"information_schema");

// Grant some privilege
if (!TestUtils.tryExecuteNonQueryWithRetry(
"test", BaseEnv.TABLE_SQL_DIALECT, senderEnv, "grant INSERT on any to user thulab", null)) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
"test",
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList("grant MANAGE_USER to user thulab", "grant INSERT on any to user thulab"),
null)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ public void testInformationSchema() throws SQLException {
"pipe_source,STRING,ATTRIBUTE,",
"pipe_processor,STRING,ATTRIBUTE,",
"pipe_sink,STRING,ATTRIBUTE,",
"exception_message,STRING,ATTRIBUTE,",
"last_exception,STRING,ATTRIBUTE,",
"suggested_action,STRING,ATTRIBUTE,",
"remaining_event_count,INT64,ATTRIBUTE,",
"estimated_remaining_seconds,DOUBLE,ATTRIBUTE,")));
TestUtils.assertResultSetEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.manager.pipe.source.ConfigRegionListeningFilter;
Expand Down Expand Up @@ -124,24 +123,32 @@ public TShowPipeResp convertToTShowPipeResp() {
final Integer nodeId = entry.getKey();
final PipeRuntimeException e = entry.getValue();
final String exceptionMessage =
DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage();
"exceptionTime:"
+ DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms")
+ ", "
+ e.getMessage();

pipeExceptionMessage2NodeIdsMap
.computeIfAbsent(exceptionMessage, k -> new TreeSet<>())
.add(nodeId);
}

for (final Map.Entry<Integer, PipeTaskMeta> entry :
runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
final Integer regionId = entry.getKey();
for (final PipeRuntimeException e : entry.getValue().getExceptionMessages()) {
final String exceptionMessage =
DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage();
pipeExceptionMessage2RegionIdsMap
.computeIfAbsent(exceptionMessage, k -> new TreeSet<>())
.add(regionId);
}
}
runtimeMeta
.getConsensusGroupId2TaskMetaMap()
.forEach(
(key, value) ->
value
.getLastException()
.ifPresent(
e ->
pipeExceptionMessage2RegionIdsMap
.computeIfAbsent(
"exceptionTime:"
+ DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms")
+ ", "
+ e.getMessage(),
k -> new TreeSet<>())
.add(key)));

for (final Map.Entry<String, Set<Integer>> entry :
pipeExceptionMessage2NodeIdsMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private void report(
pipeRuntimeException.getMessage(),
pipeRuntimeException);

pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
pipeTaskMeta.trackException(pipeRuntimeException);

// Do not call "stopAllPipesWithCriticalException" because the sinks are not reused in
// ConfigNodeSubtask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,34 +176,34 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
pipeMetaFromAgent.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
for (final Map.Entry<Integer, PipeTaskMeta> runtimeMetaFromCoordinator :
for (final Map.Entry<Integer, PipeTaskMeta> taskMetaFromCoordinator :
pipeTaskMetaMapFromCoordinator.entrySet()) {
if (runtimeMetaFromCoordinator.getValue().getLeaderNodeId() != nodeId) {
if (taskMetaFromCoordinator.getValue().getLeaderNodeId() != nodeId) {
continue;
}

final PipeTaskMeta runtimeMetaFromAgent =
pipeTaskMetaMapFromAgent.get(runtimeMetaFromCoordinator.getKey());
if (runtimeMetaFromAgent == null) {
final PipeTaskMeta taskMetaFromAgent =
pipeTaskMetaMapFromAgent.get(taskMetaFromCoordinator.getKey());
if (taskMetaFromAgent == null) {
LOGGER.debug(
"No corresponding Pipe is running in the reported DataRegion. runtimeMetaFromAgent is null, runtimeMetaFromCoordinator: {}",
runtimeMetaFromCoordinator);
taskMetaFromCoordinator);
continue;
}

// Update progress index
if (!(runtimeMetaFromCoordinator
if (!(taskMetaFromCoordinator
.getValue()
.getProgressIndex()
.isAfter(runtimeMetaFromAgent.getProgressIndex())
|| runtimeMetaFromCoordinator
.isAfter(taskMetaFromAgent.getProgressIndex())
|| taskMetaFromCoordinator
.getValue()
.getProgressIndex()
.equals(runtimeMetaFromAgent.getProgressIndex()))) {
.equals(taskMetaFromAgent.getProgressIndex()))) {
final ProgressIndex updatedProgressIndex =
runtimeMetaFromCoordinator
taskMetaFromCoordinator
.getValue()
.updateProgressIndex(runtimeMetaFromAgent.getProgressIndex());
.updateProgressIndex(taskMetaFromAgent.getProgressIndex());
PipeConfigNodeResourceManager.log()
.schedule(
PipeHeartbeatParser.class,
Expand All @@ -216,76 +216,80 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
"Updated progress index for (pipe name: {}, consensus group id: {}) ... "
+ "Progress index on coordinator: {}, progress index from agent: {}, updated progressIndex: {}",
pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
runtimeMetaFromCoordinator.getKey(),
runtimeMetaFromCoordinator.getValue().getProgressIndex(),
runtimeMetaFromAgent.getProgressIndex(),
taskMetaFromCoordinator.getKey(),
taskMetaFromCoordinator.getValue().getProgressIndex(),
taskMetaFromAgent.getProgressIndex(),
updatedProgressIndex));

needWriteConsensusOnConfigNodes.set(true);
}

// Update runtime exception
final PipeTaskMeta pipeTaskMetaFromCoordinator = runtimeMetaFromCoordinator.getValue();
pipeTaskMetaFromCoordinator.clearExceptionMessages();
for (final PipeRuntimeException exception : runtimeMetaFromAgent.getExceptionMessages()) {

// Do not judge the exception's clear time to avoid the restart process
// being ended after the failure of some pipe

pipeTaskMetaFromCoordinator.trackExceptionMessage(exception);

if (exception instanceof PipeRuntimeCriticalException) {
final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
if (!pipeMetaFromCoordinator
.getRuntimeMeta()
.getStatus()
.get()
.equals(PipeStatus.STOPPED)) {
PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);

needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.",
exception,
pipeName);
}

if (exception instanceof PipeRuntimeSinkCriticalException) {
((PipeTableResp) pipeTaskInfo.get().showPipes())
.filter(true, pipeName).getAllPipeMeta().stream()
.filter(pipeMeta -> !pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.map(PipeMeta::getRuntimeMeta)
.filter(
runtimeMeta -> !runtimeMeta.getStatus().get().equals(PipeStatus.STOPPED))
.forEach(
runtimeMeta -> {
// Record the connector exception for each pipe affected
Map<Integer, PipeRuntimeException> exceptionMap =
runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
if (!exceptionMap.containsKey(nodeId)
|| exceptionMap.get(nodeId).getTimeStamp()
< exception.getTimeStamp()) {
exceptionMap.put(nodeId, exception);
}
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);

needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
String.format(
"Detect PipeRuntimeConnectorCriticalException %s "
+ "from agent, stop pipe %s.",
exception, pipeName));
});
}
}
}
final PipeTaskMeta pipeTaskMetaFromCoordinator = taskMetaFromCoordinator.getValue();
taskMetaFromAgent
.getLastException()
.ifPresent(
exception -> {
// Do not judge the exception's clear time to avoid the restart process
// being ended after the failure of some pipe

pipeTaskMetaFromCoordinator.trackException(exception);

if (exception instanceof PipeRuntimeCriticalException) {
final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
if (!pipeMetaFromCoordinator
.getRuntimeMeta()
.getStatus()
.get()
.equals(PipeStatus.STOPPED)) {
PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);

needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.",
exception,
pipeName);
}

if (exception instanceof PipeRuntimeSinkCriticalException) {
((PipeTableResp) pipeTaskInfo.get().showPipes())
.filter(true, pipeName).getAllPipeMeta().stream()
.filter(
pipeMeta ->
!pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.map(PipeMeta::getRuntimeMeta)
.filter(
runtimeMeta ->
!runtimeMeta.getStatus().get().equals(PipeStatus.STOPPED))
.forEach(
runtimeMeta -> {
// Record the sink exception for each pipe affected
final Map<Integer, PipeRuntimeException> exceptionMap =
runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
if (!exceptionMap.containsKey(nodeId)
|| exceptionMap.get(nodeId).getTimeStamp()
< exception.getTimeStamp()) {
exceptionMap.put(nodeId, exception);
}
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);

needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
String.format(
"Detect PipeRuntimeConnectorCriticalException %s "
+ "from agent, stop pipe %s.",
exception, pipeName));
});
}
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,24 +719,6 @@ private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(

// To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false
runtimeMeta.setIsStoppedByRuntimeException(false);

runtimeMeta.setExceptionsClearTime(System.currentTimeMillis());

final Map<Integer, PipeRuntimeException> exceptionMap =
runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
if (!exceptionMap.isEmpty()) {
exceptionMap.clear();
}

runtimeMeta
.getConsensusGroupId2TaskMetaMap()
.values()
.forEach(
pipeTaskMeta -> {
if (pipeTaskMeta.getExceptionMessages().iterator().hasNext()) {
pipeTaskMeta.clearExceptionMessages();
}
});
}

public void setIsStoppedByRuntimeExceptionToFalse(final String pipeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeEx
pipeRuntimeException.getMessage(),
pipeRuntimeException);

pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
pipeTaskMeta.trackException(pipeRuntimeException);

// Quick stop all pipes locally if critical exception occurs,
// no need to wait for the next heartbeat cycle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,6 @@ public static DatasetHeader getShowPathsUsingTemplateHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPathsUsingTemplateHeaders, true);
}

public static DatasetHeader getShowPipeSinkTypeHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPipeSinkTypeColumnHeaders, true);
}

public static DatasetHeader getShowPipeSinkHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPipeSinkColumnHeaders, true);
}

public static DatasetHeader getShowPipeHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask.getFunctionType;
import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask.PIPE_PLUGIN_TYPE_BUILTIN;
import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask.PIPE_PLUGIN_TYPE_EXTERNAL;
import static org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask.suggestedActionParser;

public class InformationSchemaContentSupplierFactory {
private InformationSchemaContentSupplierFactory() {}
Expand Down Expand Up @@ -562,8 +563,13 @@ protected void constructLine() {
new Binary(tPipeInfo.getPipeProcessor(), TSFileConfig.STRING_CHARSET));
columnBuilders[5].writeBinary(
new Binary(tPipeInfo.getPipeConnector(), TSFileConfig.STRING_CHARSET));

final String exceptionMessage = tPipeInfo.getExceptionMessage();
columnBuilders[6].writeBinary(
new Binary(tPipeInfo.getExceptionMessage(), TSFileConfig.STRING_CHARSET));
columnBuilders[7].writeBinary(
new Binary(
suggestedActionParser(exceptionMessage).toString(), TSFileConfig.STRING_CHARSET));

// Optional, default 0/0.0
long remainingEventCount = tPipeInfo.getRemainingEventCount();
Expand All @@ -577,8 +583,8 @@ protected void constructLine() {
remainingTime = remainingEventAndTime.getRight();
}

columnBuilders[7].writeLong(tPipeInfo.isSetRemainingEventCount() ? remainingEventCount : -1);
columnBuilders[8].writeDouble(tPipeInfo.isSetEstimatedRemainingTime() ? remainingTime : -1);
columnBuilders[8].writeLong(tPipeInfo.isSetRemainingEventCount() ? remainingEventCount : -1);
columnBuilders[9].writeDouble(tPipeInfo.isSetEstimatedRemainingTime() ? remainingTime : -1);

resultBuilder.declarePosition();
}
Expand Down
Loading
Loading