Skip to content

Commit de0a48f

Browse files
authored
Optimize pipe event batching and listener stop (#17864) (#17885)
1 parent 7cffde5 commit de0a48f

2 files changed

Lines changed: 26 additions & 19 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@
3939
import java.io.IOException;
4040
import java.util.ArrayList;
4141
import java.util.Arrays;
42+
import java.util.HashMap;
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.Objects;
45-
import java.util.concurrent.ConcurrentHashMap;
4646

4747
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
4848
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
@@ -84,8 +84,7 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable {
8484
// If the leader cache is enabled, the batch will be divided by the leader endpoint,
8585
// each endpoint has a batch.
8686
// This is only used in plain batch since tsfile does not return redirection info.
87-
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
88-
new ConcurrentHashMap<>();
87+
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = new HashMap<>();
8988

9089
public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
9190
final boolean usingTsFileBatch =
@@ -178,22 +177,29 @@ public synchronized void onEvent(final TabletInsertionEvent event)
178177
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
179178
getAllNonEmptyAndShouldEmitBatches() {
180179
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyAndShouldEmitBatches =
181-
new ArrayList<>();
180+
new ArrayList<>(endPointToBatch.size() + 1);
182181
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
183182
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
184183
}
185-
endPointToBatch.forEach(
186-
(endPoint, batch) -> {
187-
if (!batch.isEmpty() && batch.shouldEmit()) {
188-
nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
189-
}
190-
});
184+
for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry : endPointToBatch.entrySet()) {
185+
final PipeTabletEventPlainBatch batch = entry.getValue();
186+
if (!batch.isEmpty() && batch.shouldEmit()) {
187+
nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
188+
}
189+
}
191190
return nonEmptyAndShouldEmitBatches;
192191
}
193192

194-
public boolean isEmpty() {
195-
return defaultBatch.isEmpty()
196-
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
193+
public synchronized boolean isEmpty() {
194+
if (!defaultBatch.isEmpty()) {
195+
return false;
196+
}
197+
for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
198+
if (!batch.isEmpty()) {
199+
return false;
200+
}
201+
}
202+
return true;
197203
}
198204

199205
public synchronized void discardEventsOfPipe(
@@ -206,12 +212,13 @@ public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
206212
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey));
207213
}
208214

209-
public int size() {
215+
public synchronized int size() {
210216
try {
211-
return defaultBatch.events.size()
212-
+ endPointToBatch.values().stream()
213-
.map(batch -> batch.events.size())
214-
.reduce(0, Integer::sum);
217+
int size = defaultBatch.events.size();
218+
for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
219+
size += batch.events.size();
220+
}
221+
return size;
215222
} catch (final Exception e) {
216223
LOGGER.warn(
217224
"Failed to get the size of PipeTransferBatchReqBuilder, return 0. Exception: {}",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public synchronized void startListenAndAssign(
6363
});
6464
}
6565

66-
public synchronized void stopListenAndAssign(
66+
public void stopListenAndAssign(
6767
final String dataRegionId, final PipeRealtimeDataRegionSource source) {
6868
PipeDataRegionAssigner assignerToClose = null;
6969

0 commit comments

Comments
 (0)