Skip to content

Commit 061d2de

Browse files
authored
Pipe:Fix DataNodeShutdownHook waiting report logic and add capture history file log (#15952)
* Pipe:Fix DataNodeShutdownHook waiting report logic and add capture history file log * fix
1 parent 736cdf7 commit 061d2de

2 files changed

Lines changed: 17 additions & 7 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -703,13 +703,23 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) {
703703
// instead of replication or something else.
704704
ProgressIndex dedicatedProgressIndex =
705705
tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose());
706-
return greaterThanStartIndex(dedicatedProgressIndex);
706+
return greaterThanStartIndex(resource, dedicatedProgressIndex);
707707
}
708-
return greaterThanStartIndex(resource.getMaxProgressIndexAfterClose());
708+
return greaterThanStartIndex(resource, resource.getMaxProgressIndexAfterClose());
709709
}
710710

711-
private boolean greaterThanStartIndex(ProgressIndex progressIndex) {
712-
return !startIndex.isAfter(progressIndex) && !startIndex.equals(progressIndex);
711+
private boolean greaterThanStartIndex(PersistentResource resource, ProgressIndex progressIndex) {
712+
if (!startIndex.isAfter(progressIndex) && !startIndex.equals(progressIndex)) {
713+
LOGGER.info(
714+
"Pipe {}@{}: resource {} meets mayTsFileContainUnprocessedData condition, extractor progressIndex: {}, resource ProgressIndex: {}",
715+
pipeName,
716+
dataRegionId,
717+
resource,
718+
startIndex,
719+
progressIndex);
720+
return true;
721+
}
722+
return false;
713723
}
714724

715725
private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) {
@@ -809,7 +819,7 @@ private void extractDeletions(
809819
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
810820
toBeCompared = tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
811821
}
812-
return !greaterThanStartIndex(toBeCompared);
822+
return !greaterThanStartIndex(resource, toBeCompared);
813823
})
814824
.forEach(DeletionResource::decreaseReference);
815825
// Get deletions that should be sent.
@@ -821,7 +831,7 @@ private void extractDeletions(
821831
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
822832
toBeCompared = tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
823833
}
824-
return greaterThanStartIndex(toBeCompared);
834+
return greaterThanStartIndex(resource, toBeCompared);
825835
})
826836
.collect(Collectors.toList());
827837
resourceList.addAll(allDeletionResources);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void run() {
128128
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet()) {
129129
boolean timeout = false;
130130
while (true) {
131-
if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
131+
if (entry.getValue().getRemainingNonHeartbeatEvents() == 0) {
132132
logger.info(
133133
"Successfully waited for pipe {} to finish.", entry.getValue().getPipeName());
134134
break;

0 commit comments

Comments
 (0)