Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;

import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;

import java.io.DataOutputStream;
import java.io.IOException;
Expand All @@ -38,7 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {

Expand All @@ -51,13 +54,13 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();

// database -> tableName -> Pair<size, tablets to batch>
private final Map<String, Map<String, Pair<Integer, List<Tablet>>>> tableModelTabletMap =
new HashMap<>();
Comment thread
Caideyipi marked this conversation as resolved.

// Used to rate limit when transferring data
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new HashMap<>();

PipeTabletEventPlainBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
}

PipeTabletEventPlainBatch(
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
Expand All @@ -66,9 +69,8 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
}

@Override
protected boolean constructBatch(final TabletInsertionEvent event)
throws WALPipeException, IOException {
final int bufferSize = buildTabletInsertionBuffer(event);
protected boolean constructBatch(final TabletInsertionEvent event) throws IOException {
final long bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
pipe2BytesAccumulated.compute(
new Pair<>(
Expand All @@ -89,11 +91,45 @@ public synchronized void onSuccess() {
binaryDataBases.clear();
insertNodeDataBases.clear();
tabletDataBases.clear();
tableModelTabletMap.clear();

pipe2BytesAccumulated.clear();
}

public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>> insertTablets :
tableModelTabletMap.entrySet()) {
final String databaseName = insertTablets.getKey();
for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
insertTablets.getValue().entrySet()) {
final List<Tablet> batchTablets = new ArrayList<>();
for (final Tablet tablet : tabletEntry.getValue().getRight()) {
boolean success = false;
for (final Tablet batchTablet : batchTablets) {
if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) {
success = true;
break;
}
}
if (!success) {
batchTablets.add(tablet);
}
}
for (final Tablet batchTablet : batchTablets) {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
batchTablet.serialize(outputStream);
ReadWriteIOUtils.write(true, outputStream);
tabletBuffers.add(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()));
}
tabletDataBases.add(databaseName);
}
}
}

tableModelTabletMap.clear();

return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
binaryBuffers,
insertNodeBuffers,
Expand All @@ -111,57 +147,71 @@ public Map<Pair<String, Long>, Long> getPipe2BytesAccumulated() {
return pipe2BytesAccumulated;
}

private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException, WALPipeException {
int databaseEstimateSize = 0;
private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws IOException {
long estimateSize = 0;
final ByteBuffer buffer;
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) event;
// Read the bytebuffer from the wal file and transfer it directly without serializing or
// deserializing if possible
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
if (Objects.isNull(insertNode)) {
buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
binaryBuffers.add(buffer);
if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
databaseEstimateSize =
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
binaryDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
} else {
databaseEstimateSize = 4;
binaryDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
}
} else {
if (!(insertNode instanceof RelationalInsertTabletNode)) {
buffer = insertNode.serializeToByteBuffer();
insertNodeBuffers.add(buffer);
if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
databaseEstimateSize =
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
estimateSize =
RamUsageEstimator.sizeOf(
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
} else {
databaseEstimateSize = 4;
estimateSize = 4;
insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
}
estimateSize += buffer.limit();
} else {
for (final Tablet tablet :
((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) {
estimateSize +=
constructTabletBatch(
tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
}
}
} else {
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) event;
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream);
buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
tabletBuffers.add(buffer);
if (pipeRawTabletInsertionEvent.isTableModelEvent()) {
databaseEstimateSize = pipeRawTabletInsertionEvent.getTableModelDatabaseName().length();
tabletDataBases.add(pipeRawTabletInsertionEvent.getTableModelDatabaseName());
estimateSize =
constructTabletBatch(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.getTableModelDatabaseName());
} else {
databaseEstimateSize = 4;
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream);
buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
estimateSize = 4 + buffer.limit();
tabletBuffers.add(buffer);
tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
}
}
return buffer.limit() + databaseEstimateSize;

return estimateSize;
}

private long constructTabletBatch(final Tablet tablet, final String databaseName) {
final AtomicLong size = new AtomicLong(0);
final Pair<Integer, List<Tablet>> currentBatch =
tableModelTabletMap
.computeIfAbsent(
databaseName,
k -> {
size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
return new HashMap<>();
})
.computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>()));
currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
currentBatch.getRight().add(tablet);
return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {
Expand All @@ -62,14 +64,31 @@ public List<InsertBaseStatement> constructStatements() {

final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
final Map<String, List<InsertRowStatement>> tableModelDatabaseInsertRowStatementMap =
new HashMap<>();

for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement.isWriteToTable()) {
statements.add(statement);
if (statement instanceof InsertRowStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
statements.add(statement);
} else if (statement instanceof InsertRowsStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
binaryReq));
}
continue;
}
if (statement instanceof InsertRowStatement) {
Expand All @@ -93,7 +112,22 @@ public List<InsertBaseStatement> constructStatements() {
continue;
}
if (statement.isWriteToTable()) {
statements.add(statement);
if (statement instanceof InsertRowStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
statements.add(statement);
} else if (statement instanceof InsertRowsStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
insertNodeReq));
}
continue;
}
if (statement instanceof InsertRowStatement) {
Expand Down Expand Up @@ -131,6 +165,16 @@ public List<InsertBaseStatement> constructStatements() {
if (!insertMultiTabletsStatement.isEmpty()) {
statements.add(insertMultiTabletsStatement);
}

for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
tableModelDatabaseInsertRowStatementMap.entrySet()) {
final InsertRowsStatement statement = new InsertRowsStatement();
statement.setWriteToTable(true);
statement.setDatabaseName(insertRows.getKey());
statement.setInsertRowStatementList(insertRows.getValue());
statements.add(statement);
}

return statements;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected Object reorderValueListAndBitMap(
private int getLastNonnullIndex(
final int i, final BitMap originalBitMap, final BitMap deDuplicatedBitMap) {
if (deDuplicatedIndex == null) {
if (originalBitMap.isMarked(index[i])) {
if (originalBitMap != null && originalBitMap.isMarked(index[i])) {
deDuplicatedBitMap.mark(i);
}
return index[i];
Expand Down
Loading