Skip to content

Commit f481308

Browse files
authored
Pipe: Modify Sink batch event length related metrics (#16066)
1 parent e801053 commit f481308

9 files changed

Lines changed: 61 additions & 31 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,12 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
364364
}
365365
}
366366

367+
public void setEventSizeHistogram(Histogram eventSizeHistogram) {
368+
if (outputPipeConnector instanceof IoTDBSink) {
369+
((IoTDBSink) outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram);
370+
}
371+
}
372+
367373
//////////////////////////// Error report ////////////////////////////
368374

369375
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,6 @@ private void createAutoGauge(final String taskID) {
135135
Tag.CREATION_TIME.toString(),
136136
String.valueOf(connector.getCreationTime()));
137137
// Metrics related to IoTDB connector
138-
metricService.createAutoGauge(
139-
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
140-
MetricLevel.IMPORTANT,
141-
connector,
142-
PipeSinkSubtask::getBatchSize,
143-
Tag.NAME.toString(),
144-
connector.getAttributeSortedString(),
145-
Tag.INDEX.toString(),
146-
String.valueOf(connector.getConnectorIndex()),
147-
Tag.CREATION_TIME.toString(),
148-
String.valueOf(connector.getCreationTime()));
149138
metricService.createAutoGauge(
150139
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
151140
MetricLevel.IMPORTANT,
@@ -263,6 +252,14 @@ private void createHistogram(final String taskID) {
263252
Tag.CREATION_TIME.toString(),
264253
String.valueOf(connector.getCreationTime()));
265254
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
255+
256+
Histogram eventSizeHistogram =
257+
metricService.getOrCreateHistogram(
258+
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
259+
MetricLevel.IMPORTANT,
260+
Tag.NAME.toString(),
261+
connector.getAttributeSortedString());
262+
connector.setEventSizeHistogram(eventSizeHistogram);
266263
}
267264

268265
@Override
@@ -334,15 +331,6 @@ private void removeAutoGauge(final String taskID) {
334331
Tag.CREATION_TIME.toString(),
335332
String.valueOf(connector.getCreationTime()));
336333
// Metrics related to IoTDB connector
337-
metricService.remove(
338-
MetricType.AUTO_GAUGE,
339-
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
340-
Tag.NAME.toString(),
341-
connector.getAttributeSortedString(),
342-
Tag.INDEX.toString(),
343-
String.valueOf(connector.getConnectorIndex()),
344-
Tag.CREATION_TIME.toString(),
345-
String.valueOf(connector.getCreationTime()));
346334
metricService.remove(
347335
MetricType.AUTO_GAUGE,
348336
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
@@ -440,6 +428,12 @@ private void removeHistogram(final String taskID) {
440428
connector.getAttributeSortedString(),
441429
Tag.CREATION_TIME.toString(),
442430
String.valueOf(connector.getCreationTime()));
431+
432+
metricService.remove(
433+
MetricType.HISTOGRAM,
434+
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
435+
Tag.NAME.toString(),
436+
connector.getAttributeSortedString());
443437
}
444438

445439
//////////////////////////// register & deregister (pipe integration) ////////////////////////////

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,14 @@
3636
import java.util.ArrayList;
3737
import java.util.List;
3838
import java.util.Objects;
39-
import java.util.function.BiConsumer;
4039

4140
public abstract class PipeTabletEventBatch implements AutoCloseable {
4241

4342
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
4443
private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
4544

4645
protected final List<EnrichedEvent> events = new ArrayList<>();
47-
protected final BiConsumer<Long, Long> recordMetric;
46+
protected final TriLongConsumer recordMetric;
4847

4948
private final int maxDelayInMs;
5049
private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -57,7 +56,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable {
5756
protected PipeTabletEventBatch(
5857
final int maxDelayInMs,
5958
final long requestMaxBatchSizeInBytes,
60-
final BiConsumer<Long, Long> recordMetric) {
59+
final TriLongConsumer recordMetric) {
6160
if (pipeModelFixedMemoryBlock == null) {
6261
init();
6362
}
@@ -67,7 +66,7 @@ protected PipeTabletEventBatch(
6766
this.recordMetric = recordMetric;
6867
} else {
6968
this.recordMetric =
70-
(timeInterval, bufferSize) -> {
69+
(timeInterval, bufferSize, events) -> {
7170
// do nothing
7271
};
7372
}
@@ -142,7 +141,7 @@ public boolean shouldEmit() {
142141
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
143142
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
144143
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs);
145-
recordMetric.accept(diff, totalBufferSize);
144+
recordMetric.accept(diff, totalBufferSize, events.size());
146145
return true;
147146
}
148147
return false;
@@ -235,4 +234,9 @@ public static void init() {
235234
.forceAllocateForModelFixedMemoryBlock(0, PipeMemoryBlockType.BATCH);
236235
}
237236
}
237+
238+
@FunctionalInterface
239+
public interface TriLongConsumer {
240+
void accept(long l1, long l2, long l3);
241+
}
238242
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.Objects;
42-
import java.util.function.BiConsumer;
4342

4443
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
4544

@@ -62,7 +61,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
6261
PipeTabletEventPlainBatch(
6362
final int maxDelayInMs,
6463
final long requestMaxBatchSizeInBytes,
65-
final BiConsumer<Long, Long> recordMetric) {
64+
final TriLongConsumer recordMetric) {
6665
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
6766
}
6867

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.util.Map;
4545
import java.util.Objects;
4646
import java.util.concurrent.atomic.AtomicLong;
47-
import java.util.function.BiConsumer;
4847

4948
public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
5049

@@ -69,7 +68,7 @@ public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxB
6968
public PipeTabletEventTsFileBatch(
7069
final int maxDelayInMs,
7170
final long requestMaxBatchSizeInBytes,
72-
final BiConsumer<Long, Long> recordMetric) {
71+
final TriLongConsumer recordMetric) {
7372
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
7473

7574
final AtomicLong tsFileIdGenerator = new AtomicLong(0);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable {
7575
private Histogram tabletBatchTimeIntervalHistogram = new DoNothingHistogram();
7676
private Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram();
7777

78+
private Histogram eventSizeHistogram = new DoNothingHistogram();
79+
7880
// If the leader cache is disabled (or unable to find the endpoint of event in the leader cache),
7981
// the event will be stored in the default batch.
8082
private final PipeTabletEventBatch defaultBatch;
@@ -220,14 +222,16 @@ public synchronized void close() {
220222
endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
221223
}
222224

223-
public void recordTabletMetric(long timeInterval, long bufferSize) {
225+
public void recordTabletMetric(long timeInterval, long bufferSize, long eventSize) {
224226
this.tabletBatchTimeIntervalHistogram.update(timeInterval);
225227
this.tabletBatchSizeHistogram.update(bufferSize);
228+
this.eventSizeHistogram.update(eventSize);
226229
}
227230

228-
public void recordTsFileMetric(long timeInterval, long bufferSize) {
231+
public void recordTsFileMetric(long timeInterval, long bufferSize, long eventSize) {
229232
this.tsFileBatchTimeIntervalHistogram.update(timeInterval);
230233
this.tsFileBatchSizeHistogram.update(bufferSize);
234+
this.eventSizeHistogram.update(eventSize);
231235
}
232236

233237
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
@@ -253,4 +257,10 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
253257
this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram;
254258
}
255259
}
260+
261+
public void setEventSizeHistogram(Histogram eventSizeHistogram) {
262+
if (eventSizeHistogram != null) {
263+
this.eventSizeHistogram = eventSizeHistogram;
264+
}
265+
}
256266
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,4 +861,11 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
861861
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
862862
}
863863
}
864+
865+
@Override
866+
public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
867+
if (tabletBatchBuilder != null) {
868+
tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
869+
}
870+
}
864871
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,4 +655,11 @@ public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeInterva
655655
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
656656
}
657657
}
658+
659+
@Override
660+
public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
661+
if (tabletBatchBuilder != null) {
662+
tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
663+
}
664+
}
658665
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,4 +648,8 @@ public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeInterva
648648
public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
649649
// do nothing by default
650650
}
651+
652+
public void setBatchEventSizeHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
653+
// do nothing by default
654+
}
651655
}

0 commit comments

Comments
 (0)