diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index cf6eaf6a9c5b2..d43bb6564b11b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -590,12 +590,16 @@ public void testIllegalPassword() throws Exception { "count(root.vehicle.plane.pressure),", Collections.singleton("1,")); + // After restart, the pipe keeps retrying with the stale password and may trigger login lock. + statement.execute("alter user thulab account unlock"); + try { statement.execute("alter pipe a2b modify source ('password'='fake')"); } catch (final SQLException e) { Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage()); } + statement.execute("alter user thulab account unlock"); statement.execute("alter pipe a2b modify source ('password'='newST@ongPassword')"); // Test empty alter @@ -620,6 +624,7 @@ public void testIllegalPassword() throws Exception { statement = connection.createStatement(); TestUtils.executeNonQuery( senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"); + statement.execute("alter user thulab account unlock"); statement.execute("alter user thulab set password 'newST@ongPassword'"); statement.execute("alter pipe a2b"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index 065ad3be8401f..34837424b9828 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -54,7 +54,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { private final List insertNodeBuffers = new ArrayList<>(); private final List tabletBuffers = new ArrayList<>(); - private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null; private final List insertNodeDataBases = new ArrayList<>(); private final List tabletDataBases = new ArrayList<>(); @@ -160,13 +159,14 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws buffer = insertNode.serializeToByteBuffer(); insertNodeBuffers.add(buffer); if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { - estimateSize = - RamUsageEstimator.sizeOf( - pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); - insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); + final String databaseName = + pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName(); + estimateSize = RamUsageEstimator.sizeOf(databaseName); + insertNodeDataBases.add(databaseName); } else { - estimateSize = 4; - insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER); + final String databaseName = pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); + estimateSize = RamUsageEstimator.sizeOf(databaseName); + insertNodeDataBases.add(databaseName); } estimateSize += buffer.limit(); } else { @@ -192,9 +192,10 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream); buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } - estimateSize = 4 + buffer.limit(); + final String databaseName = pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); + estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); tabletBuffers.add(buffer); - tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER); + tabletDataBases.add(databaseName); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java index f626d496b5563..80550b6350f05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java @@ -38,7 +38,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -55,14 +55,12 @@ private PipeTransferTabletBatchReqV2() { public List constructStatements() { final List statements = new ArrayList<>(); - final InsertRowsStatement insertRowsStatement = new InsertRowsStatement(); - final InsertMultiTabletsStatement insertMultiTabletsStatement = - new InsertMultiTabletsStatement(); - - final List insertRowStatementList = new ArrayList<>(); - final List insertTabletStatementList = new ArrayList<>(); final Map> tableModelDatabaseInsertRowStatementMap = - new HashMap<>(); + new LinkedHashMap<>(); + final Map> treeModelDatabaseInsertRowStatementMap = + new LinkedHashMap<>(); + final Map> treeModelDatabaseInsertTabletStatementMap = + new LinkedHashMap<>(); for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : insertNodeReqs) { final InsertBaseStatement statement = insertNodeReq.constructStatement(); @@ -77,9 +75,12 @@ public List constructStatements() { } else if (statement instanceof InsertTabletStatement) { statements.add(statement); } else if (statement instanceof InsertRowsStatement) { - tableModelDatabaseInsertRowStatementMap - .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) - .addAll(((InsertRowsStatement) statement).getInsertRowStatementList()); + for (final InsertRowStatement insertRowStatement : + ((InsertRowsStatement) statement).getInsertRowStatementList()) { + tableModelDatabaseInsertRowStatementMap + .computeIfAbsent(insertRowStatement.getDatabaseName().get(), k -> new ArrayList<>()) + .add(insertRowStatement); + } } else { throw new UnsupportedOperationException( String.format( @@ -89,12 +90,21 @@ public List constructStatements() { continue; } if (statement instanceof InsertRowStatement) { - insertRowStatementList.add((InsertRowStatement) statement); + treeModelDatabaseInsertRowStatementMap + .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new ArrayList<>()) + .add((InsertRowStatement) statement); } else if (statement instanceof InsertTabletStatement) { - insertTabletStatementList.add((InsertTabletStatement) statement); + treeModelDatabaseInsertTabletStatementMap + .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new ArrayList<>()) + .add((InsertTabletStatement) statement); } else if (statement instanceof InsertRowsStatement) { - insertRowStatementList.addAll( - ((InsertRowsStatement) statement).getInsertRowStatementList()); + for (final InsertRowStatement insertRowStatement : + ((InsertRowsStatement) statement).getInsertRowStatementList()) { + treeModelDatabaseInsertRowStatementMap + .computeIfAbsent( + insertRowStatement.getDatabaseName().orElse(null), k -> new ArrayList<>()) + .add(insertRowStatement); + } } else { throw new UnsupportedOperationException( String.format( @@ -112,17 +122,13 @@ public List constructStatements() { statements.add(statement); continue; } - insertTabletStatementList.add(statement); + treeModelDatabaseInsertTabletStatementMap + .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new ArrayList<>()) + .add(statement); } - insertRowsStatement.setInsertRowStatementList(insertRowStatementList); - insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); - if (!insertRowsStatement.isEmpty()) { - statements.add(insertRowsStatement); - } - if (!insertMultiTabletsStatement.isEmpty()) { - statements.add(insertMultiTabletsStatement); - } + addTreeModelInsertRowsStatements(statements, treeModelDatabaseInsertRowStatementMap); + addTreeModelInsertTabletsStatements(statements, treeModelDatabaseInsertTabletStatementMap); for (final Map.Entry> insertRows : tableModelDatabaseInsertRowStatementMap.entrySet()) { @@ -136,6 +142,34 @@ public List constructStatements() { return statements; } + private void addTreeModelInsertRowsStatements( + final List statements, + final Map> databaseInsertRowStatementMap) { + for (final Map.Entry> insertRows : + databaseInsertRowStatementMap.entrySet()) { + final InsertRowsStatement statement = new InsertRowsStatement(); + statement.setInsertRowStatementList(insertRows.getValue()); + if (insertRows.getKey() != null) { + statement.setDatabaseName(insertRows.getKey()); + } + statements.add(statement); + } + } + + private void addTreeModelInsertTabletsStatements( + final List statements, + final Map> databaseInsertTabletStatementMap) { + for (final Map.Entry> insertTablets : + databaseInsertTabletStatementMap.entrySet()) { + final InsertMultiTabletsStatement statement = new InsertMultiTabletsStatement(); + statement.setInsertTabletStatementList(insertTablets.getValue()); + if (insertTablets.getKey() != null) { + statement.setDatabaseName(insertTablets.getKey()); + } + statements.add(statement); + } + } + /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletBatchReqV2 toTPipeTransferReq( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java index 8166e6b07ef10..75501fe0f22e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -72,14 +73,18 @@ public InsertBaseStatement constructStatement() { return statement; } - // Table model - statement.setWriteToTable(true); + final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName); + if (isTableModel) { + statement.setWriteToTable(true); + } if (statement instanceof InsertRowsStatement) { List rowStatements = ((InsertRowsStatement) statement).getInsertRowStatementList(); if (rowStatements != null && !rowStatements.isEmpty()) { for (InsertRowStatement insertRowStatement : rowStatements) { - insertRowStatement.setWriteToTable(true); + if (isTableModel) { + insertRowStatement.setWriteToTable(true); + } insertRowStatement.setDatabaseName(dataBaseName); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java index 599e12af9edaf..e39330b5b0e25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; @@ -72,14 +73,18 @@ public InsertBaseStatement constructStatement() { return statement; } - // Table model - statement.setWriteToTable(true); + final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName); + if (isTableModel) { + statement.setWriteToTable(true); + } if (statement instanceof InsertRowsStatement) { List rowStatements = ((InsertRowsStatement) statement).getInsertRowStatementList(); if (rowStatements != null && !rowStatements.isEmpty()) { for (InsertRowStatement insertRowStatement : rowStatements) { - insertRowStatement.setWriteToTable(true); + if (isTableModel) { + insertRowStatement.setWriteToTable(true); + } insertRowStatement.setDatabaseName(dataBaseName); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java index f6b910a8844ed..2458e5e243f98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; @@ -54,29 +55,45 @@ public String getDataBaseName() { @Override public InsertTabletStatement constructStatement() { + final boolean isTableModel = + Objects.nonNull(dataBaseName) && PathUtils.isTableModelDatabase(dataBaseName); + if (statement != null) { - if (Objects.isNull(dataBaseName)) { - new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); - } else { + if (isTableModel) { new PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary(); + statement.setWriteToTable(true); + } else { + new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); + } + if (Objects.nonNull(dataBaseName)) { + statement.setDatabaseName(dataBaseName); } return statement; } - if (Objects.isNull(dataBaseName)) { - new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); - } else { + if (isTableModel) { new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary(); + } else { + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); } try { if (isTabletEmpty(tablet)) { // Empty statement, will be filtered after construction - return new InsertTabletStatement(); + statement = new InsertTabletStatement(); + return statement; } - return new InsertTabletStatement(tablet, isAligned, dataBaseName); + if (isTableModel) { + statement = new InsertTabletStatement(tablet, isAligned, dataBaseName); + } else { + statement = new InsertTabletStatement(tablet, isAligned, null); + if (Objects.nonNull(dataBaseName)) { + statement.setDatabaseName(dataBaseName); + } + } + return statement; } catch (final MetadataException e) { LOGGER.warn(DataNodePipeMessages.GENERATE_STATEMENT_FROM_TABLET_ERROR, tablet, e); return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 27740b12e07a2..3bcdcaf02e6e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -243,7 +243,7 @@ private void doTransfer( insertNode, pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null); + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()); if (!send( pipeInsertNodeTabletInsertionEvent.getPipeName(), @@ -290,7 +290,7 @@ private void doTransfer( pipeRawTabletInsertionEvent.isAligned(), pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() - : null))) { + : pipeRawTabletInsertionEvent.getTreeModelDatabaseName()))) { final String errorMessage = String.format( "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 14c854fa2addd..9adbcf6cf16d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -300,7 +300,7 @@ private boolean transferInEventWithoutCheck(final TabletInsertionEvent tabletIns final String databaseName = pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null; + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); final TPipeTransferReq pipeTransferReq = compressIfNeeded( PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, databaseName)); @@ -327,7 +327,7 @@ private boolean transferInEventWithoutCheck(final TabletInsertionEvent tabletIns pipeRawTabletInsertionEvent.isAligned(), pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() - : null)); + : pipeRawTabletInsertionEvent.getTreeModelDatabaseName())); final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler = new PipeTransferTabletRawEventHandler( pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 6cc416f4022a8..5e6297d843851 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -387,7 +387,7 @@ private void doTransfer( insertNode, pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null)); + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName())); rateLimitIfNeeded( pipeInsertNodeTabletInsertionEvent.getPipeName(), pipeInsertNodeTabletInsertionEvent.getCreationTime(), @@ -452,7 +452,7 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion pipeRawTabletInsertionEvent.isAligned(), pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() - : null)); + : pipeRawTabletInsertionEvent.getTreeModelDatabaseName())); rateLimitIfNeeded( pipeRawTabletInsertionEvent.getPipeName(), pipeRawTabletInsertionEvent.getCreationTime(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 4c0795f12cdb3..581792475c095 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -121,8 +121,6 @@ public class WriteBackSink implements PipeConnector { private UserEntity userEntity; - private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null; - private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser(); private static final Set ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet(); @@ -268,7 +266,7 @@ private void doTransfer( final String dataBaseName = pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : TREE_MODEL_DATABASE_NAME_IDENTIFIER; + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); final InsertBaseStatement insertBaseStatement; insertBaseStatement = @@ -311,7 +309,7 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion final String dataBaseName = pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() - : TREE_MODEL_DATABASE_NAME_IDENTIFIER; + : pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); final InsertTabletStatement insertTabletStatement = PipeTransferTabletRawReqV2.toTPipeTransferRawReq( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index c5b9ebed4d5f2..e8b8e36cb49c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -219,16 +220,22 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( final String databaseName = ReadWriteIOUtils.readString(byteBuffer); if (databaseName != null) { statement.setDatabaseName(databaseName); - statement.setWriteToTable(true); - // For table model, insertTargetName is table name, convert to lowercase - statement.setDevicePath(new PartialPath(insertTargetName.toLowerCase(), false)); // Calculate memory for databaseName memorySize += org.apache.tsfile.utils.RamUsageEstimator.sizeOf(databaseName); - statement.setColumnCategories(columnCategories); - - memorySize += columnCategoriesMemorySize; - memorySize += tagColumnIndicesSize; + if (PathUtils.isTableModelDatabase(databaseName)) { + statement.setWriteToTable(true); + // For table model, insertTargetName is table name, convert to lowercase + statement.setDevicePath(new PartialPath(insertTargetName.toLowerCase(), false)); + statement.setColumnCategories(columnCategories); + + memorySize += columnCategoriesMemorySize; + memorySize += tagColumnIndicesSize; + } else { + statement.setDevicePath( + DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName)); + statement.setColumnCategories(null); + } } else { // For tree model, use DataNodeDevicePathCache statement.setDevicePath( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 1246ec9049c5b..38704ec7fea18 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -44,6 +44,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -66,7 +68,9 @@ import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class PipeDataNodeThriftRequestTest { @@ -201,6 +205,33 @@ public void testPipeTransferInsertNodeReqV2() { Assert.assertEquals(statement.getDatabaseName().get(), "test"); } + @Test + public void testPipeTransferInsertNodeReqV2WithTreeModelDatabase() { + final PipeTransferTabletInsertNodeReqV2 req = + PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq( + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "test", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 1, + new Object[] {1}, + false), + "root.test"); + final PipeTransferTabletInsertNodeReqV2 deserializeReq = + PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req); + + final InsertBaseStatement statement = deserializeReq.constructStatement(); + final List paths = new ArrayList<>(); + paths.add(new PartialPath(new String[] {"root", "test", "d", "s"})); + + Assert.assertEquals(statement.getPaths(), paths); + Assert.assertFalse(statement.isWriteToTable()); + Assert.assertTrue(statement.getDatabaseName().isPresent()); + Assert.assertEquals("root.test", statement.getDatabaseName().get()); + } + @Test public void testPipeTransferTabletBinaryReq() { // Not do real test here since "serializeToWal" needs private inner class of walBuffer @@ -365,6 +396,39 @@ public void testPipeTransferTabletReqV2() { } } + @Test + public void testPipeTransferTabletReqV2WithTreeModelDatabase() { + try { + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT)); + final Tablet tablet = new Tablet("root.test.d", schemaList, 8); + tablet.addTimestamp(0, 2); + tablet.addTimestamp(1, 1); + tablet.addValue("s1", 0, 2); + tablet.addValue("s2", 0, "2"); + tablet.addValue("s1", 1, 1); + tablet.addValue("s2", 1, "1"); + + final PipeTransferTabletRawReqV2 req = + PipeTransferTabletRawReqV2.toTPipeTransferReq(tablet, false, "root.test"); + final PipeTransferTabletRawReqV2 deserializeReq = + PipeTransferTabletRawReqV2.fromTPipeTransferReq(req); + + final InsertBaseStatement statement = deserializeReq.constructStatement(); + final List paths = new ArrayList<>(); + paths.add(new PartialPath(new String[] {"root", "test", "d", "s1"})); + paths.add(new PartialPath(new String[] {"root", "test", "d", "s2"})); + + Assert.assertEquals(paths, statement.getPaths()); + Assert.assertFalse(statement.isWriteToTable()); + Assert.assertTrue(statement.getDatabaseName().isPresent()); + Assert.assertEquals("root.test", statement.getDatabaseName().get()); + } catch (final IOException e) { + Assert.fail(); + } + } + @Test public void testPipeTransferTabletBatchReq() throws IOException { final List insertNodeBuffers = new ArrayList<>(); @@ -503,6 +567,94 @@ public void testPipeTransferTabletBatchReqV2() throws IOException { Assert.assertEquals("test", deserializedReq.getInsertNodeReqs().get(0).getDataBaseName()); } + @Test + public void testPipeTransferTabletBatchReqV2WithMultipleTreeModelDatabases() throws IOException { + final List insertNodeBuffers = new ArrayList<>(); + final List tabletBuffers = new ArrayList<>(); + final List insertDataBase = new ArrayList<>(); + final List tabletDataBase = new ArrayList<>(); + + insertNodeBuffers.add( + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "db1", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 1, + new Object[] {1}, + false) + .serializeToByteBuffer()); + insertDataBase.add("root.db1"); + + insertNodeBuffers.add( + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "db2", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 2, + new Object[] {2}, + false) + .serializeToByteBuffer()); + insertDataBase.add("root.db2"); + + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT32)); + + final Tablet db1Tablet = new Tablet("root.db1.d", schemaList, 8); + db1Tablet.addTimestamp(0, 1); + db1Tablet.addValue("s1", 0, 1); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + db1Tablet.serialize(outputStream); + ReadWriteIOUtils.write(false, outputStream); + tabletBuffers.add( + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); + tabletDataBase.add("root.db1"); + } + + final Tablet db2Tablet = new Tablet("root.db2.d", schemaList, 8); + db2Tablet.addTimestamp(0, 2); + db2Tablet.addValue("s1", 0, 2); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + db2Tablet.serialize(outputStream); + ReadWriteIOUtils.write(false, outputStream); + tabletBuffers.add( + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); + tabletDataBase.add("root.db2"); + } + + final PipeTransferTabletBatchReqV2 req = + PipeTransferTabletBatchReqV2.toTPipeTransferReq( + insertNodeBuffers, tabletBuffers, insertDataBase, tabletDataBase); + final PipeTransferTabletBatchReqV2 deserializedReq = + PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req); + + final List statements = deserializedReq.constructStatements(); + final Set insertRowsDatabases = new HashSet<>(); + final Set insertTabletsDatabases = new HashSet<>(); + + for (final InsertBaseStatement statement : statements) { + Assert.assertFalse(statement.isWriteToTable()); + Assert.assertTrue(statement.getDatabaseName().isPresent()); + if (statement instanceof InsertRowsStatement) { + insertRowsDatabases.add(statement.getDatabaseName().get()); + } else if (statement instanceof InsertMultiTabletsStatement) { + insertTabletsDatabases.add(statement.getDatabaseName().get()); + } else { + Assert.fail("Unexpected statement type: " + statement.getClass().getName()); + } + } + + Assert.assertEquals( + new HashSet<>(java.util.Arrays.asList("root.db1", "root.db2")), insertRowsDatabases); + Assert.assertEquals( + new HashSet<>(java.util.Arrays.asList("root.db1", "root.db2")), insertTabletsDatabases); + } + @Test public void testPipeTransferFilePieceReq() throws IOException { final byte[] body = "testPipeTransferFilePieceReq".getBytes();