Skip to content

Commit 851ed27

Browse files
authored
[To dev/1.3] Pipe: Modify Sink batch event length related metrics (#16066) (#16074)
(cherry picked from commit f481308) (cherry picked from commit f481308) # Conflicts: # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
1 parent 173aa30 commit 851ed27

9 files changed

Lines changed: 61 additions & 31 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,12 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
364364
}
365365
}
366366

367+
public void setEventSizeHistogram(Histogram eventSizeHistogram) {
368+
if (outputPipeConnector instanceof IoTDBSink) {
369+
((IoTDBSink) outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram);
370+
}
371+
}
372+
367373
//////////////////////////// Error report ////////////////////////////
368374

369375
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,6 @@ private void createAutoGauge(final String taskID) {
135135
Tag.CREATION_TIME.toString(),
136136
String.valueOf(connector.getCreationTime()));
137137
// Metrics related to IoTDB connector
138-
metricService.createAutoGauge(
139-
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
140-
MetricLevel.IMPORTANT,
141-
connector,
142-
PipeSinkSubtask::getBatchSize,
143-
Tag.NAME.toString(),
144-
connector.getAttributeSortedString(),
145-
Tag.INDEX.toString(),
146-
String.valueOf(connector.getConnectorIndex()),
147-
Tag.CREATION_TIME.toString(),
148-
String.valueOf(connector.getCreationTime()));
149138
metricService.createAutoGauge(
150139
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
151140
MetricLevel.IMPORTANT,
@@ -263,6 +252,14 @@ private void createHistogram(final String taskID) {
263252
Tag.CREATION_TIME.toString(),
264253
String.valueOf(connector.getCreationTime()));
265254
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
255+
256+
Histogram eventSizeHistogram =
257+
metricService.getOrCreateHistogram(
258+
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
259+
MetricLevel.IMPORTANT,
260+
Tag.NAME.toString(),
261+
connector.getAttributeSortedString());
262+
connector.setEventSizeHistogram(eventSizeHistogram);
266263
}
267264

268265
@Override
@@ -334,15 +331,6 @@ private void removeAutoGauge(final String taskID) {
334331
Tag.CREATION_TIME.toString(),
335332
String.valueOf(connector.getCreationTime()));
336333
// Metrics related to IoTDB connector
337-
metricService.remove(
338-
MetricType.AUTO_GAUGE,
339-
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
340-
Tag.NAME.toString(),
341-
connector.getAttributeSortedString(),
342-
Tag.INDEX.toString(),
343-
String.valueOf(connector.getConnectorIndex()),
344-
Tag.CREATION_TIME.toString(),
345-
String.valueOf(connector.getCreationTime()));
346334
metricService.remove(
347335
MetricType.AUTO_GAUGE,
348336
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
@@ -440,6 +428,12 @@ private void removeHistogram(final String taskID) {
440428
connector.getAttributeSortedString(),
441429
Tag.CREATION_TIME.toString(),
442430
String.valueOf(connector.getCreationTime()));
431+
432+
metricService.remove(
433+
MetricType.HISTOGRAM,
434+
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
435+
Tag.NAME.toString(),
436+
connector.getAttributeSortedString());
443437
}
444438

445439
//////////////////////////// register & deregister (pipe integration) ////////////////////////////

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,14 @@
3636
import java.util.ArrayList;
3737
import java.util.List;
3838
import java.util.Objects;
39-
import java.util.function.BiConsumer;
4039

4140
public abstract class PipeTabletEventBatch implements AutoCloseable {
4241

4342
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
4443
private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
4544

4645
protected final List<EnrichedEvent> events = new ArrayList<>();
47-
protected final BiConsumer<Long, Long> recordMetric;
46+
protected final TriLongConsumer recordMetric;
4847

4948
private final int maxDelayInMs;
5049
private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -57,7 +56,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable {
5756
protected PipeTabletEventBatch(
5857
final int maxDelayInMs,
5958
final long requestMaxBatchSizeInBytes,
60-
final BiConsumer<Long, Long> recordMetric) {
59+
final TriLongConsumer recordMetric) {
6160
if (pipeModelFixedMemoryBlock == null) {
6261
init();
6362
}
@@ -67,7 +66,7 @@ protected PipeTabletEventBatch(
6766
this.recordMetric = recordMetric;
6867
} else {
6968
this.recordMetric =
70-
(timeInterval, bufferSize) -> {
69+
(timeInterval, bufferSize, events) -> {
7170
// do nothing
7271
};
7372
}
@@ -142,7 +141,7 @@ public boolean shouldEmit() {
142141
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
143142
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
144143
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs);
145-
recordMetric.accept(diff, totalBufferSize);
144+
recordMetric.accept(diff, totalBufferSize, events.size());
146145
return true;
147146
}
148147
return false;
@@ -235,4 +234,9 @@ public static void init() {
235234
.forceAllocateForModelFixedMemoryBlock(0, PipeMemoryBlockType.BATCH);
236235
}
237236
}
237+
238+
@FunctionalInterface
239+
public interface TriLongConsumer {
240+
void accept(long l1, long l2, long l3);
241+
}
238242
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.List;
4242
import java.util.Map;
4343
import java.util.Objects;
44-
import java.util.function.BiConsumer;
4544

4645
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
4746

@@ -61,7 +60,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
6160
PipeTabletEventPlainBatch(
6261
final int maxDelayInMs,
6362
final long requestMaxBatchSizeInBytes,
64-
final BiConsumer<Long, Long> recordMetric) {
63+
final TriLongConsumer recordMetric) {
6564
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
6665
}
6766

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import java.util.Set;
7373
import java.util.concurrent.atomic.AtomicLong;
7474
import java.util.concurrent.atomic.AtomicReference;
75-
import java.util.function.BiConsumer;
7675
import java.util.stream.Collectors;
7776
import java.util.stream.IntStream;
7877

@@ -103,7 +102,7 @@ public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxB
103102
public PipeTabletEventTsFileBatch(
104103
final int maxDelayInMs,
105104
final long requestMaxBatchSizeInBytes,
106-
final BiConsumer<Long, Long> recordMetric) {
105+
final TriLongConsumer recordMetric) {
107106
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
108107

109108
try {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable {
7575
private Histogram tabletBatchTimeIntervalHistogram = new DoNothingHistogram();
7676
private Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram();
7777

78+
private Histogram eventSizeHistogram = new DoNothingHistogram();
79+
7880
// If the leader cache is disabled (or unable to find the endpoint of event in the leader cache),
7981
// the event will be stored in the default batch.
8082
private final PipeTabletEventBatch defaultBatch;
@@ -219,14 +221,16 @@ public synchronized void close() {
219221
endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
220222
}
221223

222-
public void recordTabletMetric(long timeInterval, long bufferSize) {
224+
public void recordTabletMetric(long timeInterval, long bufferSize, long eventSize) {
223225
this.tabletBatchTimeIntervalHistogram.update(timeInterval);
224226
this.tabletBatchSizeHistogram.update(bufferSize);
227+
this.eventSizeHistogram.update(eventSize);
225228
}
226229

227-
public void recordTsFileMetric(long timeInterval, long bufferSize) {
230+
public void recordTsFileMetric(long timeInterval, long bufferSize, long eventSize) {
228231
this.tsFileBatchTimeIntervalHistogram.update(timeInterval);
229232
this.tsFileBatchSizeHistogram.update(bufferSize);
233+
this.eventSizeHistogram.update(eventSize);
230234
}
231235

232236
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
@@ -252,4 +256,10 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
252256
this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram;
253257
}
254258
}
259+
260+
public void setEventSizeHistogram(Histogram eventSizeHistogram) {
261+
if (eventSizeHistogram != null) {
262+
this.eventSizeHistogram = eventSizeHistogram;
263+
}
264+
}
255265
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,4 +844,11 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
844844
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
845845
}
846846
}
847+
848+
@Override
849+
public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
850+
if (tabletBatchBuilder != null) {
851+
tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
852+
}
853+
}
847854
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,4 +576,11 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
576576
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
577577
}
578578
}
579+
580+
@Override
581+
public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
582+
if (tabletBatchBuilder != null) {
583+
tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
584+
}
585+
}
579586
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,4 +644,8 @@ public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeInterva
644644
public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
645645
// do nothing by default
646646
}
647+
648+
public void setBatchEventSizeHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
649+
// do nothing by default
650+
}
647651
}

0 commit comments

Comments
 (0)