Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
Expand All @@ -39,8 +40,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty;

public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {

Expand All @@ -62,14 +68,47 @@ public List<InsertBaseStatement> constructStatements() {

final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
final Map<String, List<InsertRowStatement>> tableModelDatabaseInsertRowStatementMap =
new HashMap<>();
final Map<String, Map<String, Tablet>> tableModelDBTable2TabletMap = new HashMap<>();
final AtomicReference<Exception> lastExcept = new AtomicReference<>();

for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement.isWriteToTable()) {
statements.add(statement);
if (statement instanceof InsertRowStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
tableModelDBTable2TabletMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new HashMap<>())
.compute(
statement.getTableName(),
(k, v) -> {
final Tablet tablet = ((InsertTabletStatement) statement).convertToTablet();
if (Objects.isNull(v)) {
return tablet;
} else {
if (!v.append(tablet)) {
lastExcept.set(null);
}
return v;
}
});
} else if (statement instanceof InsertRowsStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
binaryReq));
}
continue;
}
if (statement instanceof InsertRowStatement) {
Expand All @@ -93,7 +132,36 @@ public List<InsertBaseStatement> constructStatements() {
continue;
}
if (statement.isWriteToTable()) {
statements.add(statement);
if (statement instanceof InsertRowStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
tableModelDBTable2TabletMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new HashMap<>())
.compute(
statement.getTableName(),
(k, v) -> {
final Tablet tablet = ((InsertTabletStatement) statement).convertToTablet();
if (Objects.isNull(v)) {
return tablet;
} else {
if (!v.append(tablet)) {
lastExcept.set(null);
}
return v;
}
});
} else if (statement instanceof InsertRowsStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
insertNodeReq));
}
continue;
}
if (statement instanceof InsertRowStatement) {
Expand All @@ -112,15 +180,29 @@ public List<InsertBaseStatement> constructStatements() {
}

for (final PipeTransferTabletRawReqV2 tabletReq : tabletReqs) {
final InsertTabletStatement statement = tabletReq.constructStatement();
if (statement.isEmpty()) {
final Tablet tablet = tabletReq.tablet;
if (isTabletEmpty(tablet)) {
continue;
}
if (statement.isWriteToTable()) {
statements.add(statement);
if (Objects.nonNull(tabletReq.dataBaseName)) {
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
tableModelDBTable2TabletMap
.computeIfAbsent(tabletReq.dataBaseName, k -> new HashMap<>())
.compute(
tablet.getTableName(),
(k, v) -> {
if (Objects.isNull(v)) {
return tablet;
} else {
if (!v.append(tablet)) {
lastExcept.set(null);
}
return v;
}
});
continue;
}
insertTabletStatementList.add(statement);
insertTabletStatementList.add(tabletReq.constructStatement());
}

insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
Expand All @@ -131,6 +213,25 @@ public List<InsertBaseStatement> constructStatements() {
if (!insertMultiTabletsStatement.isEmpty()) {
statements.add(insertMultiTabletsStatement);
}

for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
tableModelDatabaseInsertRowStatementMap.entrySet()) {
final InsertRowsStatement statement = new InsertRowsStatement();
statement.setWriteToTable(true);
statement.setDatabaseName(insertRows.getKey());
statement.setInsertRowStatementList(insertRows.getValue());
statements.add(statement);
}

for (final Map.Entry<String, Map<String, Tablet>> insertTablets :
tableModelDBTable2TabletMap.entrySet()) {
final String databaseName = insertTablets.getKey();
for (final Map.Entry<String, Tablet> tablet : insertTablets.getValue().entrySet()) {
// The tablets in table model are all aligned
statements.add(
PipeTransferTabletRawReqV2.constructStatement(tablet.getValue(), databaseName, true));
}
}
return statements;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public String getDataBaseName() {

@Override
public InsertTabletStatement constructStatement() {
return constructStatement(tablet, dataBaseName, isAligned);
}

public static InsertTabletStatement constructStatement(
final Tablet tablet, final String dataBaseName, final boolean isAligned) {
if (Objects.isNull(dataBaseName)) {
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;

import java.util.ArrayList;
Expand All @@ -59,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class InsertTabletStatement extends InsertBaseStatement implements ISchemaValidation {

Expand Down Expand Up @@ -141,6 +144,22 @@ public void setTimes(long[] times) {
this.times = times;
}

public Tablet convertToTablet() {
final Tablet tablet =
new Tablet(
getTableName(),
Arrays.asList(measurements),
Arrays.asList(dataTypes),
Arrays.stream(columnCategories)
.map(TsTableColumnCategory::toTsFileColumnType)
.collect(Collectors.toList()),
rowCount);
tablet.setBitMaps(nullBitMaps);
tablet.setTimestamps(times);
tablet.setValues(columns);
return tablet;
}

@Override
public boolean isEmpty() {
return rowCount == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ public class CommonConfig {
private boolean pipeEventReferenceTrackingEnabled = true;
private long pipeEventReferenceEliminateIntervalSeconds = 10;

private long pipeReceiverTabletBatchSize = 1024;

private boolean subscriptionEnabled = false;

private float subscriptionCacheMemoryUsagePercentage = 0.2F;
Expand Down Expand Up @@ -1987,6 +1989,21 @@ public void setPipeEventReferenceEliminateIntervalSeconds(
pipeEventReferenceEliminateIntervalSeconds);
}

public long getPipeReceiverTabletBatchSize() {
return pipeReceiverTabletBatchSize;
}

public void setPipeReceiverTabletBatchSize(long pipeReceiverTabletBatchSize) {
if (this.pipeReceiverTabletBatchSize
== pipeReceiverTabletBatchSize) {
return;
}
this.pipeReceiverTabletBatchSize = pipeReceiverTabletBatchSize;
logger.info(
"pipeReceiverTabletBatchSize is set to {}",
pipeReceiverTabletBatchSize);
}

public boolean getSubscriptionEnabled() {
return subscriptionEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ public static void loadPipeStaticConfig(CommonConfig config, TrimProperties prop
"pipe_listening_queue_transfer_snapshot_threshold",
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));

config.setPipeListeningQueueTransferSnapshotThreshold(
Long.parseLong(
properties.getProperty(
"pipe_listening_queue_transfer_snapshot_threshold",
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));

config.setPipeSnapshotExecutionMaxBatchSize(
Integer.parseInt(
properties.getProperty(
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
<tsfile.version>2.2.0-250722-SNAPSHOT</tsfile.version>
<tsfile.version>2.2.0-250730-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
Expand Down
Loading