6666import java .util .stream .Collectors ;
6767
6868import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_END_TIME_KEY ;
69+ import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE ;
70+ import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY ;
6971import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE ;
7072import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_HISTORY_ENABLE_KEY ;
7173import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_HISTORY_END_TIME_KEY ;
7981import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_MODS_ENABLE_KEY ;
8082import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .EXTRACTOR_START_TIME_KEY ;
8183import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .SOURCE_END_TIME_KEY ;
84+ import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .SOURCE_FORWARDING_PIPE_REQUESTS_KEY ;
8285import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .SOURCE_HISTORY_ENABLE_KEY ;
8386import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .SOURCE_HISTORY_END_TIME_KEY ;
8487import static org .apache .iotdb .commons .pipe .config .constant .PipeExtractorConstant .SOURCE_HISTORY_LOOSE_RANGE_KEY ;
@@ -119,6 +122,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
119122
120123 private boolean isTerminateSignalSent = false ;
121124
125+ private boolean isForwardingPipeRequests ;
126+
122127 private volatile boolean hasBeenStarted = false ;
123128
124129 private Queue <TsFileResource > pendingQueue ;
@@ -330,9 +335,15 @@ public void customize(
330335 || // Should extract deletion
331336 listeningOptionPair .getRight ());
332337
338+ isForwardingPipeRequests =
339+ parameters .getBooleanOrDefault (
340+ Arrays .asList (
341+ EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY , SOURCE_FORWARDING_PIPE_REQUESTS_KEY ),
342+ EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE );
343+
333344 if (LOGGER .isInfoEnabled ()) {
334345 LOGGER .info (
335- "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}" ,
346+ "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}, is forwarding pipe requests: {} " ,
336347 pipeName ,
337348 dataRegionId ,
338349 DateTimeUtils .convertLongToDate (historicalDataExtractionStartTime ),
@@ -341,7 +352,8 @@ public void customize(
341352 historicalDataExtractionEndTime ,
342353 sloppyPattern ,
343354 sloppyTimeRange ,
344- shouldTransferModFile );
355+ shouldTransferModFile ,
356+ isForwardingPipeRequests );
345357 }
346358 }
347359
@@ -444,6 +456,9 @@ public synchronized void start() {
444456 resource ->
445457 // Some resource is marked as deleted but not removed from the list.
446458 !resource .isDeleted ()
459+ // Some resource is generated by pipe. We ignore them if the pipe should
460+ // not transfer pipe requests.
461+ && (!resource .isGeneratedByPipe () || isForwardingPipeRequests )
447462 && (
448463 // Some resource may not be closed due to the control of
449464 // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
@@ -461,6 +476,9 @@ && mayTsFileResourceOverlappedWithPattern(resource)))
461476 resource ->
462477 // Some resource is marked as deleted but not removed from the list.
463478 !resource .isDeleted ()
479+ // Some resource is generated by pipe. We ignore them if the pipe should
480+ // not transfer pipe requests.
481+ && (!resource .isGeneratedByPipe () || isForwardingPipeRequests )
464482 && (
465483 // Some resource may not be closed due to the control of
466484 // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
0 commit comments