2020package org .apache .iotdb .db .pipe .extractor .dataregion .realtime ;
2121
2222import org .apache .iotdb .commons .exception .pipe .PipeRuntimeNonCriticalException ;
23- import org .apache .iotdb .commons .pipe .config .PipeConfig ;
2423import org .apache .iotdb .commons .pipe .event .ProgressReportEvent ;
2524import org .apache .iotdb .db .pipe .agent .PipeDataNodeAgent ;
2625import org .apache .iotdb .db .pipe .event .common .heartbeat .PipeHeartbeatEvent ;
2726import org .apache .iotdb .db .pipe .event .common .schema .PipeSchemaRegionWritePlanEvent ;
2827import org .apache .iotdb .db .pipe .event .common .tsfile .PipeTsFileInsertionEvent ;
2928import org .apache .iotdb .db .pipe .event .realtime .PipeRealtimeEvent ;
30- import org .apache .iotdb .db .pipe .extractor .dataregion .IoTDBDataRegionExtractor ;
3129import org .apache .iotdb .db .pipe .extractor .dataregion .realtime .assigner .PipeTsFileEpochProgressIndexKeeper ;
3230import org .apache .iotdb .db .pipe .extractor .dataregion .realtime .epoch .TsFileEpoch ;
3331import org .apache .iotdb .db .pipe .metric .overview .PipeDataNodeRemainingEventAndTimeOperator ;
3432import org .apache .iotdb .db .pipe .metric .overview .PipeDataNodeSinglePipeMetrics ;
35- import org .apache .iotdb .db .pipe .metric .source .PipeDataRegionExtractorMetrics ;
36- import org .apache .iotdb .db .pipe .resource .PipeDataNodeResourceManager ;
3733import org .apache .iotdb .db .pipe .resource .memory .PipeMemoryManager ;
3834import org .apache .iotdb .pipe .api .event .Event ;
3935import org .apache .iotdb .pipe .api .event .dml .insertion .TabletInsertionEvent ;
4238import org .slf4j .Logger ;
4339import org .slf4j .LoggerFactory ;
4440
45- import java .util .Objects ;
4641import java .util .Optional ;
4742
4843public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
@@ -83,7 +78,7 @@ public boolean isNeedListenToInsertNode() {
8378 private void extractTabletInsertion (final PipeRealtimeEvent event ) {
8479 TsFileEpoch .State state ;
8580
86- if (canNotUseTabletAnyMore (event )) {
81+ if (canNotUseTabletAnymore (event )) {
8782 event .getTsFileEpoch ().migrateState (this , curState -> TsFileEpoch .State .USING_TSFILE );
8883 PipeTsFileEpochProgressIndexKeeper .getInstance ()
8984 .registerProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getResource ());
@@ -163,7 +158,7 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
163158 return TsFileEpoch .State .USING_TSFILE ;
164159 case USING_BOTH :
165160 default :
166- return canNotUseTabletAnyMore (event )
161+ return canNotUseTabletAnymore (event )
167162 ? TsFileEpoch .State .USING_TSFILE
168163 : TsFileEpoch .State .USING_BOTH ;
169164 }
@@ -172,9 +167,10 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
172167 final TsFileEpoch .State state = event .getTsFileEpoch ().getState (this );
173168 switch (state ) {
174169 case USING_TABLET :
175- // Though the data in tsfile event has been extracted in tablet mode, we still need to
176- // extract the tsfile event to help to determine isTsFileEventCountInQueueExceededLimit().
177- // The extracted tsfile event will be discarded in supplyTsFileInsertion.
170+ // If the state is USING_TABLET, discard the event
171+ PipeTsFileEpochProgressIndexKeeper .getInstance ()
172+ .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
173+ return ;
178174 case EMPTY :
179175 case USING_TSFILE :
180176 case USING_BOTH :
@@ -203,17 +199,9 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
203199 }
204200 }
205201
206- private boolean canNotUseTabletAnyMore (final PipeRealtimeEvent event ) {
207- // In the following 4 cases, we should not extract this tablet event. all the data
208- // represented by the tablet event should be carried by the following tsfile event:
209- // the write operation will be throttled, so we should not extract any more tablet events.
210- // 1. The shallow memory usage of the insert node has reached the dangerous threshold.
211- // 2. Deprecated logics (unused by default)
212- return mayInsertNodeMemoryReachDangerousThreshold (event )
213- || canNotUseTabletAnymoreDeprecated (event );
214- }
215-
216- private boolean mayInsertNodeMemoryReachDangerousThreshold (final PipeRealtimeEvent event ) {
202+ // If the insertNode's memory has reached the dangerous threshold, we should not extract any
203+ // tablets.
204+ private boolean canNotUseTabletAnymore (final PipeRealtimeEvent event ) {
217205 final long floatingMemoryUsageInByte =
218206 PipeDataNodeAgent .task ().getFloatingMemoryUsageInByte (pipeName );
219207 final long pipeCount = PipeDataNodeAgent .task ().getPipeCount ();
@@ -225,7 +213,7 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
225213 final PipeDataNodeRemainingEventAndTimeOperator operator =
226214 PipeDataNodeSinglePipeMetrics .getInstance ().remainingEventAndTimeOperatorMap .get (pipeID );
227215 LOGGER .info (
228- "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}" ,
216+ "Pipe task {}@{} canNotUseTabletAnyMore for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}" ,
229217 pipeName ,
230218 dataRegionId ,
231219 event .getTsFileEpoch ().getFilePath (),
@@ -238,83 +226,6 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
238226 return mayInsertNodeMemoryReachDangerousThreshold ;
239227 }
240228
241- /**
242- * These judgements are deprecated, and are only reserved for manual operation and compatibility.
243- */
244- @ Deprecated
245- private boolean canNotUseTabletAnymoreDeprecated (final PipeRealtimeEvent event ) {
246- // In the following 5 cases, we should not extract any more tablet events. all the data
247- // represented by the tablet events should be carried by the following tsfile event:
248- // 1. The number of historical tsFile events to transfer has exceeded the limit.
249- // 2. The number of realtime tsfile events to transfer has exceeded the limit.
250- // 3. The number of linked tsFiles has reached the dangerous threshold.
251- return isHistoricalTsFileEventCountExceededLimit (event )
252- || isRealtimeTsFileEventCountExceededLimit (event )
253- || mayTsFileLinkedCountReachDangerousThreshold (event );
254- }
255-
256- private boolean isHistoricalTsFileEventCountExceededLimit (final PipeRealtimeEvent event ) {
257- if (PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ()
258- == Integer .MAX_VALUE ) {
259- return false ;
260- }
261- final IoTDBDataRegionExtractor extractor =
262- PipeDataRegionExtractorMetrics .getInstance ().getExtractorMap ().get (getTaskID ());
263- final boolean isHistoricalTsFileEventCountExceededLimit =
264- Objects .nonNull (extractor )
265- && extractor .getHistoricalTsFileInsertionEventCount ()
266- >= PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ();
267- if (isHistoricalTsFileEventCountExceededLimit && event .mayExtractorUseTablets (this )) {
268- LOGGER .info (
269- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1) for tsFile {}: The number of historical tsFile events {} has exceeded the limit {}" ,
270- pipeName ,
271- dataRegionId ,
272- event .getTsFileEpoch ().getFilePath (),
273- extractor .getHistoricalTsFileInsertionEventCount (),
274- PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ());
275- }
276- return isHistoricalTsFileEventCountExceededLimit ;
277- }
278-
279- private boolean isRealtimeTsFileEventCountExceededLimit (final PipeRealtimeEvent event ) {
280- if (PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ()
281- == Integer .MAX_VALUE ) {
282- return false ;
283- }
284- final boolean isRealtimeTsFileEventCountExceededLimit =
285- pendingQueue .getTsFileInsertionEventCount ()
286- >= PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ();
287- if (isRealtimeTsFileEventCountExceededLimit && event .mayExtractorUseTablets (this )) {
288- LOGGER .info (
289- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2) for tsFile {}: The number of realtime tsFile events {} has exceeded the limit {}" ,
290- pipeName ,
291- dataRegionId ,
292- event .getTsFileEpoch ().getFilePath (),
293- pendingQueue .getTsFileInsertionEventCount (),
294- PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ());
295- }
296- return isRealtimeTsFileEventCountExceededLimit ;
297- }
298-
299- private boolean mayTsFileLinkedCountReachDangerousThreshold (final PipeRealtimeEvent event ) {
300- if (PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount () == Long .MAX_VALUE ) {
301- return false ;
302- }
303- final boolean mayTsFileLinkedCountReachDangerousThreshold =
304- PipeDataNodeResourceManager .tsfile ().getLinkedTsFileCount (pipeName )
305- >= PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount ();
306- if (mayTsFileLinkedCountReachDangerousThreshold && event .mayExtractorUseTablets (this )) {
307- LOGGER .info (
308- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3) for tsFile {}: The number of linked tsFiles {} has reached the dangerous threshold {}" ,
309- pipeName ,
310- dataRegionId ,
311- event .getTsFileEpoch ().getFilePath (),
312- PipeDataNodeResourceManager .tsfile ().getLinkedTsFileCount (pipeName ),
313- PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount ());
314- }
315- return mayTsFileLinkedCountReachDangerousThreshold ;
316- }
317-
318229 @ Override
319230 public Event supply () {
320231 PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent ) pendingQueue .directPoll ();
@@ -356,103 +267,40 @@ public Event supply() {
356267 }
357268
358269 private Event supplyTabletInsertion (final PipeRealtimeEvent event ) {
359- event
360- .getTsFileEpoch ()
361- .migrateState (
362- this ,
363- state -> {
364- switch (state ) {
365- case EMPTY :
366- return canNotUseTabletAnyMore (event )
367- ? TsFileEpoch .State .USING_TSFILE
368- : TsFileEpoch .State .USING_TABLET ;
369- case USING_TSFILE :
370- return canNotUseTabletAnyMore (event )
371- ? TsFileEpoch .State .USING_TSFILE
372- : TsFileEpoch .State .USING_BOTH ;
373- case USING_TABLET :
374- case USING_BOTH :
375- default :
376- return state ;
377- }
378- });
379-
380- final TsFileEpoch .State state = event .getTsFileEpoch ().getState (this );
381- if (state == TsFileEpoch .State .USING_TSFILE ) {
382- PipeTsFileEpochProgressIndexKeeper .getInstance ()
383- .registerProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getResource ());
384- }
385-
386- switch (state ) {
387- case USING_TSFILE :
388- // If the state is USING_TSFILE, discard the event and poll the next one.
389- return null ;
390- case EMPTY :
391- case USING_TABLET :
392- case USING_BOTH :
393- default :
394- if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
395- return event .getEvent ();
396- } else {
397- // If the event's reference count can not be increased, it means the data represented by
398- // this event is not reliable anymore. but the data represented by this event
399- // has been carried by the following tsfile event, so we can just discard this event.
400- event .getTsFileEpoch ().migrateState (this , s -> TsFileEpoch .State .USING_BOTH );
401- LOGGER .warn (
402- "Discard tablet event {} because it is not reliable anymore. "
403- + "Change the state of TsFileEpoch to USING_TSFILE." ,
404- event );
405- return null ;
406- }
270+ if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
271+ return event .getEvent ();
272+ } else {
273+ // If the event's reference count can not be increased, it means the data represented by
274+ // this event is not reliable anymore. but the data represented by this event
275+ // has been carried by the following tsfile event, so we can just discard this event.
276+ event .getTsFileEpoch ().migrateState (this , s -> TsFileEpoch .State .USING_BOTH );
277+ LOGGER .warn (
278+ "Discard tablet event {} because it is not reliable anymore. "
279+ + "Change the state of TsFileEpoch to USING_BOTH." ,
280+ event );
281+ return null ;
407282 }
408283 }
409284
410285 private Event supplyTsFileInsertion (final PipeRealtimeEvent event ) {
411- event
412- .getTsFileEpoch ()
413- .migrateState (
414- this ,
415- state -> {
416- // This would not happen, but just in case.
417- if (state .equals (TsFileEpoch .State .EMPTY )) {
418- LOGGER .error (
419- String .format ("EMPTY TsFileEpoch when supplying TsFile Event %s" , event ));
420- return TsFileEpoch .State .USING_TSFILE ;
421- }
422- return state ;
423- });
424-
425- final TsFileEpoch .State state = event .getTsFileEpoch ().getState (this );
426- switch (state ) {
427- case USING_TABLET :
428- // If the state is USING_TABLET, discard the event and poll the next one.
429- PipeTsFileEpochProgressIndexKeeper .getInstance ()
430- .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
431- return null ;
432- case EMPTY :
433- case USING_TSFILE :
434- case USING_BOTH :
435- default :
436- if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
437- return event .getEvent ();
438- } else {
439- // If the event's reference count can not be increased, it means the data represented by
440- // this event is not reliable anymore. the data has been lost. we simply discard this
441- // event
442- // and report the exception to PipeRuntimeAgent.
443- final String errorMessage =
444- String .format (
445- "TsFile Event %s can not be supplied because "
446- + "the reference count can not be increased, "
447- + "the data represented by this event is lost" ,
448- event .getEvent ());
449- LOGGER .error (errorMessage );
450- PipeDataNodeAgent .runtime ()
451- .report (pipeTaskMeta , new PipeRuntimeNonCriticalException (errorMessage ));
452- PipeTsFileEpochProgressIndexKeeper .getInstance ()
453- .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
454- return null ;
455- }
286+ if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
287+ return event .getEvent ();
288+ } else {
289+ // If the event's reference count can not be increased, it means the data represented by
290+ // this event is not reliable anymore. the data has been lost. we simply discard this
291+ // event and report the exception to PipeRuntimeAgent.
292+ final String errorMessage =
293+ String .format (
294+ "TsFile Event %s can not be supplied because "
295+ + "the reference count can not be increased, "
296+ + "the data represented by this event is lost" ,
297+ event .getEvent ());
298+ LOGGER .error (errorMessage );
299+ PipeDataNodeAgent .runtime ()
300+ .report (pipeTaskMeta , new PipeRuntimeNonCriticalException (errorMessage ));
301+ PipeTsFileEpochProgressIndexKeeper .getInstance ()
302+ .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
303+ return null ;
456304 }
457305 }
458306}
0 commit comments