Skip to content

Commit 173aa30

Browse files
authored
[To dev/1.3] Pipe: Modify Sink Batch Metrics (#16018) (#16064)
1 parent 212df13 commit 173aa30

9 files changed

Lines changed: 261 additions & 56 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
3535
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
3636
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
37+
import org.apache.iotdb.metrics.type.Histogram;
3738
import org.apache.iotdb.pipe.api.PipeConnector;
3839
import org.apache.iotdb.pipe.api.event.Event;
3940
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -337,6 +338,32 @@ public double getTotalCompressedSize() {
337338
: 0;
338339
}
339340

341+
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
342+
if (outputPipeConnector instanceof IoTDBSink) {
343+
((IoTDBSink) outputPipeConnector).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
344+
}
345+
}
346+
347+
public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
348+
if (outputPipeConnector instanceof IoTDBSink) {
349+
((IoTDBSink) outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
350+
}
351+
}
352+
353+
public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) {
354+
if (outputPipeConnector instanceof IoTDBSink) {
355+
((IoTDBSink) outputPipeConnector)
356+
.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
357+
}
358+
}
359+
360+
public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
361+
if (outputPipeConnector instanceof IoTDBSink) {
362+
((IoTDBSink) outputPipeConnector)
363+
.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
364+
}
365+
}
366+
340367
//////////////////////////// Error report ////////////////////////////
341368

342369
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.iotdb.commons.service.metric.enums.Tag;
2424
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
2525
import org.apache.iotdb.metrics.AbstractMetricService;
26-
import org.apache.iotdb.metrics.impl.DoNothingHistogram;
2726
import org.apache.iotdb.metrics.metricsets.IMetricSet;
2827
import org.apache.iotdb.metrics.type.Histogram;
2928
import org.apache.iotdb.metrics.type.Rate;
@@ -45,14 +44,6 @@ public class PipeDataRegionSinkMetrics implements IMetricSet {
4544

4645
private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionSinkMetrics.class);
4746

48-
public static Histogram tabletBatchSizeHistogram = new DoNothingHistogram();
49-
50-
public static Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
51-
52-
public static Histogram tabletBatchTimeIntervalHistogram = new DoNothingHistogram();
53-
54-
public static Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram();
55-
5647
@SuppressWarnings("java:S3077")
5748
private volatile AbstractMetricService metricService;
5849

@@ -75,28 +66,13 @@ public void bindTo(final AbstractMetricService metricService) {
7566
for (String taskID : taskIDs) {
7667
createMetrics(taskID);
7768
}
78-
79-
tabletBatchSizeHistogram =
80-
metricService.getOrCreateHistogram(
81-
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), MetricLevel.IMPORTANT);
82-
83-
tsFileBatchSizeHistogram =
84-
metricService.getOrCreateHistogram(
85-
Metric.PIPE_TSFILE_BATCH_SIZE.toString(), MetricLevel.IMPORTANT);
86-
87-
tabletBatchTimeIntervalHistogram =
88-
metricService.getOrCreateHistogram(
89-
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), MetricLevel.IMPORTANT);
90-
91-
tsFileBatchTimeIntervalHistogram =
92-
metricService.getOrCreateHistogram(
93-
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(), MetricLevel.IMPORTANT);
9469
}
9570

9671
private void createMetrics(final String taskID) {
9772
createAutoGauge(taskID);
9873
createRate(taskID);
9974
createTimer(taskID);
75+
createHistogram(taskID);
10076
}
10177

10278
private void createAutoGauge(final String taskID) {
@@ -245,6 +221,50 @@ private void createTimer(final String taskID) {
245221
String.valueOf(connector.getCreationTime())));
246222
}
247223

224+
private void createHistogram(final String taskID) {
225+
final PipeSinkSubtask connector = connectorMap.get(taskID);
226+
227+
final Histogram tabletBatchSizeHistogram =
228+
metricService.getOrCreateHistogram(
229+
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
230+
MetricLevel.IMPORTANT,
231+
Tag.NAME.toString(),
232+
connector.getAttributeSortedString(),
233+
Tag.CREATION_TIME.toString(),
234+
String.valueOf(connector.getCreationTime()));
235+
connector.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
236+
237+
final Histogram tsFileBatchSizeHistogram =
238+
metricService.getOrCreateHistogram(
239+
Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
240+
MetricLevel.IMPORTANT,
241+
Tag.NAME.toString(),
242+
connector.getAttributeSortedString(),
243+
Tag.CREATION_TIME.toString(),
244+
String.valueOf(connector.getCreationTime()));
245+
connector.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
246+
247+
final Histogram tabletBatchTimeIntervalHistogram =
248+
metricService.getOrCreateHistogram(
249+
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
250+
MetricLevel.IMPORTANT,
251+
Tag.NAME.toString(),
252+
connector.getAttributeSortedString(),
253+
Tag.CREATION_TIME.toString(),
254+
String.valueOf(connector.getCreationTime()));
255+
connector.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
256+
257+
final Histogram tsFileBatchTimeIntervalHistogram =
258+
metricService.getOrCreateHistogram(
259+
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
260+
MetricLevel.IMPORTANT,
261+
Tag.NAME.toString(),
262+
connector.getAttributeSortedString(),
263+
Tag.CREATION_TIME.toString(),
264+
String.valueOf(connector.getCreationTime()));
265+
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
266+
}
267+
248268
@Override
249269
public void unbindFrom(final AbstractMetricService metricService) {
250270
final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(connectorMap.keySet());
@@ -255,20 +275,13 @@ public void unbindFrom(final AbstractMetricService metricService) {
255275
LOGGER.warn(
256276
"Failed to unbind from pipe data region connector metrics, connector map not empty");
257277
}
258-
259-
metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString());
260-
261-
metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_TSFILE_BATCH_SIZE.toString());
262-
263-
metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString());
264-
265-
metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_TSFILE_BATCH_TIME_COST.toString());
266278
}
267279

268280
private void removeMetrics(final String taskID) {
269281
removeAutoGauge(taskID);
270282
removeRate(taskID);
271283
removeTimer(taskID);
284+
removeHistogram(taskID);
272285
}
273286

274287
private void removeAutoGauge(final String taskID) {
@@ -397,6 +410,38 @@ private void removeTimer(final String taskID) {
397410
compressionTimerMap.remove(connector.getAttributeSortedString());
398411
}
399412

413+
private void removeHistogram(final String taskID) {
414+
final PipeSinkSubtask connector = connectorMap.get(taskID);
415+
metricService.remove(
416+
MetricType.HISTOGRAM,
417+
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
418+
Tag.NAME.toString(),
419+
connector.getAttributeSortedString(),
420+
Tag.CREATION_TIME.toString(),
421+
String.valueOf(connector.getCreationTime()));
422+
metricService.remove(
423+
MetricType.HISTOGRAM,
424+
Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
425+
Tag.NAME.toString(),
426+
connector.getAttributeSortedString(),
427+
Tag.CREATION_TIME.toString(),
428+
String.valueOf(connector.getCreationTime()));
429+
metricService.remove(
430+
MetricType.HISTOGRAM,
431+
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
432+
Tag.NAME.toString(),
433+
connector.getAttributeSortedString(),
434+
Tag.CREATION_TIME.toString(),
435+
String.valueOf(connector.getCreationTime()));
436+
metricService.remove(
437+
MetricType.HISTOGRAM,
438+
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
439+
Tag.NAME.toString(),
440+
connector.getAttributeSortedString(),
441+
Tag.CREATION_TIME.toString(),
442+
String.valueOf(connector.getCreationTime()));
443+
}
444+
400445
//////////////////////////// register & deregister (pipe integration) ////////////////////////////
401446

402447
public void register(@NonNull final PipeSinkSubtask pipeSinkSubtask) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
import java.util.ArrayList;
3737
import java.util.List;
3838
import java.util.Objects;
39+
import java.util.function.BiConsumer;
3940

4041
public abstract class PipeTabletEventBatch implements AutoCloseable {
4142

4243
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
4344
private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
4445

4546
protected final List<EnrichedEvent> events = new ArrayList<>();
47+
protected final BiConsumer<Long, Long> recordMetric;
4648

4749
private final int maxDelayInMs;
4850
private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -52,12 +54,23 @@ public abstract class PipeTabletEventBatch implements AutoCloseable {
5254

5355
protected volatile boolean isClosed = false;
5456

55-
protected PipeTabletEventBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
57+
protected PipeTabletEventBatch(
58+
final int maxDelayInMs,
59+
final long requestMaxBatchSizeInBytes,
60+
final BiConsumer<Long, Long> recordMetric) {
5661
if (pipeModelFixedMemoryBlock == null) {
5762
init();
5863
}
5964

6065
this.maxDelayInMs = maxDelayInMs;
66+
if (recordMetric != null) {
67+
this.recordMetric = recordMetric;
68+
} else {
69+
this.recordMetric =
70+
(timeInterval, bufferSize) -> {
71+
// do nothing
72+
};
73+
}
6174

6275
// limit in buffer size
6376
this.allocatedMemoryBlock =
@@ -129,14 +142,12 @@ public boolean shouldEmit() {
129142
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
130143
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
131144
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs);
132-
recordMetric(diff, totalBufferSize);
145+
recordMetric.accept(diff, totalBufferSize);
133146
return true;
134147
}
135148
return false;
136149
}
137150

138-
protected abstract void recordMetric(final long timeInterval, final long bufferSize);
139-
140151
private long getMaxBatchSizeInBytes() {
141152
return allocatedMemoryBlock.getMemoryUsageInBytes();
142153
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2424
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
25-
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
2625
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReq;
2726
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
2827
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -42,6 +41,7 @@
4241
import java.util.List;
4342
import java.util.Map;
4443
import java.util.Objects;
44+
import java.util.function.BiConsumer;
4545

4646
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
4747

@@ -55,7 +55,14 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
5555
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new HashMap<>();
5656

5757
PipeTabletEventPlainBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
58-
super(maxDelayInMs, requestMaxBatchSizeInBytes);
58+
super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
59+
}
60+
61+
PipeTabletEventPlainBatch(
62+
final int maxDelayInMs,
63+
final long requestMaxBatchSizeInBytes,
64+
final BiConsumer<Long, Long> recordMetric) {
65+
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
5966
}
6067

6168
@Override
@@ -71,12 +78,6 @@ protected boolean constructBatch(final TabletInsertionEvent event)
7178
return true;
7279
}
7380

74-
@Override
75-
protected void recordMetric(long timeInterval, long bufferSize) {
76-
PipeDataRegionSinkMetrics.tabletBatchTimeIntervalHistogram.update(timeInterval);
77-
PipeDataRegionSinkMetrics.tabletBatchSizeHistogram.update(bufferSize);
78-
}
79-
8081
@Override
8182
public synchronized void onSuccess() {
8283
super.onSuccess();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
2525
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2626
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
27-
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
2827
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2928
import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
3029
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -73,6 +72,7 @@
7372
import java.util.Set;
7473
import java.util.concurrent.atomic.AtomicLong;
7574
import java.util.concurrent.atomic.AtomicReference;
75+
import java.util.function.BiConsumer;
7676
import java.util.stream.Collectors;
7777
import java.util.stream.IntStream;
7878

@@ -97,7 +97,14 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
9797
private volatile TsFileWriter fileWriter;
9898

9999
public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
100-
super(maxDelayInMs, requestMaxBatchSizeInBytes);
100+
this(maxDelayInMs, requestMaxBatchSizeInBytes, null);
101+
}
102+
103+
public PipeTabletEventTsFileBatch(
104+
final int maxDelayInMs,
105+
final long requestMaxBatchSizeInBytes,
106+
final BiConsumer<Long, Long> recordMetric) {
107+
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
101108

102109
try {
103110
this.batchFileBaseDir = getNextBaseDir();
@@ -182,12 +189,6 @@ protected boolean constructBatch(final TabletInsertionEvent event) {
182189
return true;
183190
}
184191

185-
@Override
186-
protected void recordMetric(long timeInterval, long bufferSize) {
187-
PipeDataRegionSinkMetrics.tsFileBatchTimeIntervalHistogram.update(timeInterval);
188-
PipeDataRegionSinkMetrics.tsFileBatchSizeHistogram.update(bufferSize);
189-
}
190-
191192
private void bufferTablet(
192193
final String pipeName,
193194
final long creationTime,

0 commit comments

Comments
 (0)