Skip to content

Commit cb2da10

Browse files
authored
[To dev/1.3] Pipe: Optimized the degrading logger & Deleted useless UT & Copied some historical filter logic from dev/1.3 (#16019) (#16020)
* logger * fix-ut * Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
1 parent 6cc148a commit cb2da10

3 files changed

Lines changed: 15 additions & 57 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
3131
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
3232
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
33+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
34+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
3335
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
3436
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
3537
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -41,6 +43,7 @@
4143
import org.slf4j.LoggerFactory;
4244

4345
import java.util.Objects;
46+
import java.util.Optional;
4447

4548
public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
4649

@@ -219,13 +222,18 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
219222
final boolean mayInsertNodeMemoryReachDangerousThreshold =
220223
floatingMemoryUsageInByte * pipeCount >= totalFloatingMemorySizeInBytes;
221224
if (mayInsertNodeMemoryReachDangerousThreshold && event.mayExtractorUseTablets(this)) {
225+
final PipeDataNodeRemainingEventAndTimeOperator operator =
226+
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
222227
LOGGER.info(
223-
"Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold {}",
228+
"Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}",
224229
pipeName,
225230
dataRegionId,
226231
event.getTsFileEpoch().getFilePath(),
227-
floatingMemoryUsageInByte * pipeCount,
228-
totalFloatingMemorySizeInBytes);
232+
floatingMemoryUsageInByte,
233+
totalFloatingMemorySizeInBytes / pipeCount,
234+
Optional.ofNullable(operator)
235+
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
236+
.orElse(0));
229237
}
230238
return mayInsertNodeMemoryReachDangerousThreshold;
231239
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ public long getRemainingNonHeartbeatEvents() {
116116
return remainingEvents >= 0 ? remainingEvents : 0;
117117
}
118118

119+
public int getInsertNodeEventCount() {
120+
return insertNodeEventCount.get();
121+
}
122+
119123
long getRemainingEvents() {
120124
final long remainingEvents =
121125
tsfileEventCount.get()

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
2727
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
2828
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
29-
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
3029

3130
import org.junit.Assert;
3231
import org.junit.Test;
@@ -35,24 +34,6 @@
3534

3635
public class PipeConnectorTest {
3736

38-
@Test(expected = PipeParameterNotValidException.class)
39-
public void testIoTDBLegacyPipeConnectorToSelf() throws Exception {
40-
try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
41-
connector.validate(
42-
new PipeParameterValidator(
43-
new PipeParameters(
44-
new HashMap<String, String>() {
45-
{
46-
put(
47-
PipeConnectorConstant.CONNECTOR_KEY,
48-
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
49-
put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1");
50-
put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, "6667");
51-
}
52-
})));
53-
}
54-
}
55-
5637
@Test
5738
public void testIoTDBLegacyPipeConnectorToOthers() {
5839
try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
@@ -73,24 +54,6 @@ public void testIoTDBLegacyPipeConnectorToOthers() {
7354
}
7455
}
7556

76-
@Test(expected = PipeParameterNotValidException.class)
77-
public void testIoTDBThriftSyncConnectorToSelf() throws Exception {
78-
try (IoTDBDataRegionSyncConnector connector = new IoTDBDataRegionSyncConnector()) {
79-
connector.validate(
80-
new PipeParameterValidator(
81-
new PipeParameters(
82-
new HashMap<String, String>() {
83-
{
84-
put(
85-
PipeConnectorConstant.CONNECTOR_KEY,
86-
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
87-
put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1");
88-
put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, "6667");
89-
}
90-
})));
91-
}
92-
}
93-
9457
@Test
9558
public void testIoTDBThriftSyncConnectorToOthers() {
9659
try (IoTDBDataRegionSyncConnector connector = new IoTDBDataRegionSyncConnector()) {
@@ -111,23 +74,6 @@ public void testIoTDBThriftSyncConnectorToOthers() {
11174
}
11275
}
11376

114-
@Test(expected = PipeParameterNotValidException.class)
115-
public void testIoTDBThriftAsyncConnectorToSelf() throws Exception {
116-
try (IoTDBDataRegionAsyncConnector connector = new IoTDBDataRegionAsyncConnector()) {
117-
connector.validate(
118-
new PipeParameterValidator(
119-
new PipeParameters(
120-
new HashMap<String, String>() {
121-
{
122-
put(
123-
PipeConnectorConstant.CONNECTOR_KEY,
124-
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
125-
put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667");
126-
}
127-
})));
128-
}
129-
}
130-
13177
@Test
13278
public void testIoTDBThriftAsyncConnectorToOthers() {
13379
try (IoTDBDataRegionAsyncConnector connector = new IoTDBDataRegionAsyncConnector()) {

0 commit comments

Comments
 (0)