Skip to content

Commit 4917041

Browse files
committed
try-fix
1 parent 04e2b1e commit 4917041

1 file changed

Lines changed: 22 additions & 3 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
5656
private final List<String> insertNodeDataBases = new ArrayList<>();
5757
private final List<String> tabletDataBases = new ArrayList<>();
5858

59-
private final Map<String, Map<String, Pair<Long, List<Tablet>>>> tableModelTabletMap =
59+
private final Map<String, Map<String, Pair<Integer, List<Tablet>>>> tableModelTabletMap =
6060
new HashMap<>();
6161

6262
// Used to rate limit when transferring data
@@ -100,6 +100,25 @@ public synchronized void onSuccess() {
100100
}
101101

102102
public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
103+
for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>> insertTablets :
104+
tableModelTabletMap.entrySet()) {
105+
final String databaseName = insertTablets.getKey();
106+
for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
107+
insertTablets.getValue().entrySet()) {
108+
Tablet batchTablet = null;
109+
for (final Tablet tablet : tabletEntry.getValue().getRight()) {
110+
if (Objects.isNull(batchTablet)) {
111+
batchTablet = tablet;
112+
} else {
113+
batchTablet.append(tablet, tabletEntry.getValue().getLeft());
114+
}
115+
}
116+
assert batchTablet != null;
117+
tabletBuffers.add(batchTablet.serialize());
118+
tabletDataBases.add(databaseName);
119+
}
120+
}
121+
103122
return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
104123
binaryBuffers,
105124
insertNodeBuffers,
@@ -172,15 +191,15 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws
172191

173192
private long constructTabletBatch(final Tablet tablet, final String databaseName) {
174193
final AtomicLong size = new AtomicLong(0);
175-
final Pair<Long, List<Tablet>> currentBatch =
194+
final Pair<Integer, List<Tablet>> currentBatch =
176195
tableModelTabletMap
177196
.computeIfAbsent(
178197
databaseName,
179198
k -> {
180199
size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
181200
return new HashMap<>();
182201
})
183-
.computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0L, new ArrayList<>()));
202+
.computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>()));
184203
currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
185204
currentBatch.getRight().add(tablet);
186205
return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);

0 commit comments

Comments
 (0)