Skip to content

Commit 6532d29

Browse files
authored
Optimize pipe event batching and listener stop (#17864)
1 parent 4d55d51 commit 6532d29

3 files changed

Lines changed: 39 additions & 28 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.HashMap;
4848
import java.util.List;
4949
import java.util.Map;
50-
import java.util.concurrent.atomic.AtomicLong;
50+
import java.util.Objects;
5151

5252
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
5353

@@ -105,14 +105,18 @@ public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
105105
insertTablets.getValue().entrySet()) {
106106
// needCopyFlag and tablet
107107
final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>();
108+
final int totalRowSize = tabletEntry.getValue().getLeft();
108109
for (final Tablet tablet : tabletEntry.getValue().getRight()) {
109110
boolean success = false;
110111
for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
112+
if (!canAppendTablet(tabletPair.getRight(), tablet)) {
113+
continue;
114+
}
111115
if (tabletPair.getLeft()) {
112116
tabletPair.setRight(copyTablet(tabletPair.getRight()));
113117
tabletPair.setLeft(Boolean.FALSE);
114118
}
115-
if (tabletPair.getRight().append(tablet, tabletEntry.getValue().getLeft())) {
119+
if (tabletPair.getRight().append(tablet, totalRowSize)) {
116120
success = true;
117121
break;
118122
}
@@ -203,21 +207,21 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws
203207
}
204208

205209
private long constructTabletBatch(final Tablet tablet, final String databaseName) {
206-
final AtomicLong size = new AtomicLong(0);
207210
final Pair<Integer, List<Tablet>> currentBatch =
208211
tableModelTabletMap
209-
.computeIfAbsent(
210-
databaseName,
211-
k -> {
212-
size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
213-
return new HashMap<>();
214-
})
212+
.computeIfAbsent(databaseName, k -> new HashMap<>())
215213
.computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>()));
216214
currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
217215
currentBatch.getRight().add(tablet);
218216
return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
219217
}
220218

219+
private static boolean canAppendTablet(final Tablet target, final Tablet source) {
220+
return Objects.equals(target.getDeviceId(), source.getDeviceId())
221+
&& Objects.equals(target.getSchemas(), source.getSchemas())
222+
&& Objects.equals(target.getColumnTypes(), source.getColumnTypes());
223+
}
224+
221225
public static Tablet copyTablet(final Tablet tablet) {
222226
final Object[] copiedValues = new Object[tablet.getValues().length];
223227
for (int i = 0; i < tablet.getValues().length; i++) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@
4040
import java.io.IOException;
4141
import java.util.ArrayList;
4242
import java.util.Arrays;
43+
import java.util.HashMap;
4344
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Objects;
46-
import java.util.concurrent.ConcurrentHashMap;
4747

4848
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
4949
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
@@ -85,8 +85,7 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable {
8585
// If the leader cache is enabled, the batch will be divided by the leader endpoint,
8686
// each endpoint has a batch.
8787
// This is only used in plain batch since tsfile does not return redirection info.
88-
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
89-
new ConcurrentHashMap<>();
88+
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = new HashMap<>();
9089

9190
public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
9291
final boolean usingTsFileBatch =
@@ -182,22 +181,29 @@ public synchronized void onEvent(final TabletInsertionEvent event)
182181
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
183182
getAllNonEmptyAndShouldEmitBatches() {
184183
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyAndShouldEmitBatches =
185-
new ArrayList<>();
184+
new ArrayList<>(endPointToBatch.size() + 1);
186185
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
187186
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
188187
}
189-
endPointToBatch.forEach(
190-
(endPoint, batch) -> {
191-
if (!batch.isEmpty() && batch.shouldEmit()) {
192-
nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
193-
}
194-
});
188+
for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry : endPointToBatch.entrySet()) {
189+
final PipeTabletEventPlainBatch batch = entry.getValue();
190+
if (!batch.isEmpty() && batch.shouldEmit()) {
191+
nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
192+
}
193+
}
195194
return nonEmptyAndShouldEmitBatches;
196195
}
197196

198-
public boolean isEmpty() {
199-
return defaultBatch.isEmpty()
200-
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
197+
public synchronized boolean isEmpty() {
198+
if (!defaultBatch.isEmpty()) {
199+
return false;
200+
}
201+
for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
202+
if (!batch.isEmpty()) {
203+
return false;
204+
}
205+
}
206+
return true;
201207
}
202208

203209
public synchronized void discardEventsOfPipe(
@@ -210,12 +216,13 @@ public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
210216
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey));
211217
}
212218

213-
public int size() {
219+
public synchronized int size() {
214220
try {
215-
return defaultBatch.events.size()
216-
+ endPointToBatch.values().stream()
217-
.map(batch -> batch.events.size())
218-
.reduce(0, Integer::sum);
221+
int size = defaultBatch.events.size();
222+
for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
223+
size += batch.events.size();
224+
}
225+
return size;
219226
} catch (final Exception e) {
220227
LOGGER.warn(
221228
DataNodePipeMessages.FAILED_TO_GET_THE_SIZE_OF_PIPETRANSFERBATCHREQBUILDER,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public synchronized void startListenAndAssign(
6666
});
6767
}
6868

69-
public synchronized void stopListenAndAssign(
69+
public void stopListenAndAssign(
7070
final int dataRegionId, final PipeRealtimeDataRegionSource source) {
7171
PipeDataRegionAssigner assignerToClose = null;
7272

0 commit comments

Comments
 (0)