Skip to content

Commit 8e9c434

Browse files
authored
Pipe: Modify epoch status metric changes (#16272)
* Pipe: Modify epoch status metric changes * fix
1 parent 935b9a8 commit 8e9c434

6 files changed

Lines changed: 44 additions & 6 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ protected void doExtract(final PipeRealtimeEvent event) {
5353
extractTabletInsertion(event);
5454
} else if (eventToExtract instanceof TsFileInsertionEvent) {
5555
extractTsFileInsertion(event);
56+
event.getTsFileEpoch().clearState(this);
5657
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
5758
extractHeartbeat(event);
5859
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ protected void doExtract(PipeRealtimeEvent event) {
4747
extractTabletInsertion(event);
4848
} else if (eventToExtract instanceof TsFileInsertionEvent) {
4949
extractTsFileInsertion(event);
50+
event.getTsFileEpoch().clearState(this);
5051
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
5152
extractHeartbeat(event);
5253
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.Objects;
6464
import java.util.Set;
6565
import java.util.concurrent.atomic.AtomicBoolean;
66+
import java.util.concurrent.atomic.AtomicLong;
6667
import java.util.concurrent.atomic.AtomicReference;
6768
import java.util.stream.Collectors;
6869

@@ -121,6 +122,8 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor {
121122
private boolean sloppyTimeRange; // true to disable time range filter after extraction
122123
private boolean sloppyPattern; // true to disable pattern filter after extraction
123124

125+
private AtomicLong extractEpochSize = new AtomicLong();
126+
124127
// This queue is used to store pending events extracted by the method extract(). The method
125128
// supply() will poll events from this queue and send them to the next pipe plugin.
126129
protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
@@ -646,4 +649,16 @@ public int getEventCount() {
646649
public String getTaskID() {
647650
return taskID;
648651
}
652+
653+
public void increaseExtractEpochSize() {
654+
extractEpochSize.incrementAndGet();
655+
}
656+
657+
public void decreaseExtractEpochSize() {
658+
extractEpochSize.decrementAndGet();
659+
}
660+
661+
public boolean extractEpochSizeIsEmpty() {
662+
return extractEpochSize.get() == 0;
663+
}
649664
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ protected void doExtract(PipeRealtimeEvent event) {
7474
// Ignore the event.
7575
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(), false);
7676
}
77+
78+
event.getTsFileEpoch().clearState(this);
7779
}
7880

7981
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
2424
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
2525

26+
import java.util.Objects;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.atomic.AtomicLong;
@@ -49,9 +50,30 @@ public TsFileEpoch.State getState(final PipeRealtimeDataRegionSource extractor)
4950

5051
public void migrateState(
5152
final PipeRealtimeDataRegionSource extractor, final TsFileEpochStateMigrator visitor) {
52-
dataRegionExtractor2State
53-
.computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
54-
.getAndUpdate(visitor::migrate);
53+
AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
54+
55+
if (stateRef == null) {
56+
dataRegionExtractor2State.putIfAbsent(
57+
extractor, stateRef = new AtomicReference<>(State.EMPTY));
58+
extractor.increaseExtractEpochSize();
59+
setExtractorsRecentProcessedTsFileEpochState();
60+
}
61+
62+
State migratedState = visitor.migrate(stateRef.get());
63+
if (!Objects.equals(stateRef.get(), migratedState)) {
64+
stateRef.set(migratedState);
65+
setExtractorsRecentProcessedTsFileEpochState();
66+
}
67+
}
68+
69+
public void clearState(final PipeRealtimeDataRegionSource extractor) {
70+
if (dataRegionExtractor2State.containsKey(extractor)) {
71+
extractor.decreaseExtractEpochSize();
72+
}
73+
if (extractor.extractEpochSizeIsEmpty()) {
74+
PipeDataRegionSourceMetrics.getInstance()
75+
.setRecentProcessedTsFileEpochState(extractor.getTaskID(), State.EMPTY);
76+
}
5577
}
5678

5779
public void setExtractorsRecentProcessedTsFileEpochState() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent(
6161
});
6262

6363
final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
64-
// When all data corresponding to this TsFileEpoch have been extracted, update the state
65-
// of the extractors processing this TsFileEpoch.
66-
epoch.setExtractorsRecentProcessedTsFileEpochState();
6764

6865
LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
6966
return new PipeRealtimeEvent(

0 commit comments

Comments
 (0)