Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,16 @@ public void testIllegalPassword() throws Exception {
"count(root.vehicle.plane.pressure),",
Collections.singleton("1,"));

// After restart, the pipe keeps retrying with the stale password and may trigger login lock.
statement.execute("alter user thulab account unlock");

try {
statement.execute("alter pipe a2b modify source ('password'='fake')");
} catch (final SQLException e) {
Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage());
}

statement.execute("alter user thulab account unlock");
statement.execute("alter pipe a2b modify source ('password'='newST@ongPassword')");

// Test empty alter
Expand All @@ -620,6 +624,7 @@ public void testIllegalPassword() throws Exception {
statement = connection.createStatement();
TestUtils.executeNonQuery(
senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)");
statement.execute("alter user thulab account unlock");
statement.execute("alter user thulab set password 'newST@ongPassword'");
statement.execute("alter pipe a2b");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();

private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();

Expand Down Expand Up @@ -160,13 +159,14 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws
buffer = insertNode.serializeToByteBuffer();
insertNodeBuffers.add(buffer);
if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
estimateSize =
RamUsageEstimator.sizeOf(
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
final String databaseName =
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName();
estimateSize = RamUsageEstimator.sizeOf(databaseName);
insertNodeDataBases.add(databaseName);
} else {
estimateSize = 4;
insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
final String databaseName = pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
estimateSize = RamUsageEstimator.sizeOf(databaseName);
insertNodeDataBases.add(databaseName);
}
estimateSize += buffer.limit();
} else {
Expand All @@ -192,9 +192,10 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws
ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream);
buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
estimateSize = 4 + buffer.limit();
final String databaseName = pipeRawTabletInsertionEvent.getTreeModelDatabaseName();
estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit();
tabletBuffers.add(buffer);
tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
tabletDataBases.add(databaseName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -52,17 +52,15 @@
// Empty constructor
}

public List<InsertBaseStatement> constructStatements() {

Check warning on line 55 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 83 to 64, Complexity from 16 to 14, Nesting Level from 5 to 2, Number of Variables from 18 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-Emmu8JHXHMTxCsal&open=AZ4-Emmu8JHXHMTxCsal&pullRequest=17713
final List<InsertBaseStatement> statements = new ArrayList<>();

final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
final InsertMultiTabletsStatement insertMultiTabletsStatement =
new InsertMultiTabletsStatement();

final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
final Map<String, List<InsertRowStatement>> tableModelDatabaseInsertRowStatementMap =
new HashMap<>();
new LinkedHashMap<>();
final Map<String, List<InsertRowStatement>> treeModelDatabaseInsertRowStatementMap =
new LinkedHashMap<>();
final Map<String, List<InsertTabletStatement>> treeModelDatabaseInsertTabletStatementMap =
new LinkedHashMap<>();

for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
Expand All @@ -77,9 +75,12 @@
} else if (statement instanceof InsertTabletStatement) {
statements.add(statement);
} else if (statement instanceof InsertRowsStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
for (final InsertRowStatement insertRowStatement :
((InsertRowsStatement) statement).getInsertRowStatementList()) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(insertRowStatement.getDatabaseName().get(), k -> new ArrayList<>())
.add(insertRowStatement);
}
} else {
throw new UnsupportedOperationException(
String.format(
Expand All @@ -89,12 +90,21 @@
continue;
}
if (statement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) statement);
treeModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new ArrayList<>())
.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
treeModelDatabaseInsertTabletStatementMap
.computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new ArrayList<>())
.add((InsertTabletStatement) statement);
} else if (statement instanceof InsertRowsStatement) {
insertRowStatementList.addAll(
((InsertRowsStatement) statement).getInsertRowStatementList());
for (final InsertRowStatement insertRowStatement :
((InsertRowsStatement) statement).getInsertRowStatementList()) {
treeModelDatabaseInsertRowStatementMap
.computeIfAbsent(
insertRowStatement.getDatabaseName().orElse(null), k -> new ArrayList<>())
.add(insertRowStatement);
}
} else {
throw new UnsupportedOperationException(
String.format(
Expand All @@ -112,17 +122,13 @@
statements.add(statement);
continue;
}
insertTabletStatementList.add(statement);
treeModelDatabaseInsertTabletStatementMap
.computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new ArrayList<>())
.add(statement);
}

insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
if (!insertRowsStatement.isEmpty()) {
statements.add(insertRowsStatement);
}
if (!insertMultiTabletsStatement.isEmpty()) {
statements.add(insertMultiTabletsStatement);
}
addTreeModelInsertRowsStatements(statements, treeModelDatabaseInsertRowStatementMap);
addTreeModelInsertTabletsStatements(statements, treeModelDatabaseInsertTabletStatementMap);

for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
tableModelDatabaseInsertRowStatementMap.entrySet()) {
Expand All @@ -136,6 +142,34 @@
return statements;
}

private void addTreeModelInsertRowsStatements(
final List<InsertBaseStatement> statements,
final Map<String, List<InsertRowStatement>> databaseInsertRowStatementMap) {
for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
databaseInsertRowStatementMap.entrySet()) {
final InsertRowsStatement statement = new InsertRowsStatement();
statement.setInsertRowStatementList(insertRows.getValue());
if (insertRows.getKey() != null) {
statement.setDatabaseName(insertRows.getKey());
}
statements.add(statement);
}
}

private void addTreeModelInsertTabletsStatements(
final List<InsertBaseStatement> statements,
final Map<String, List<InsertTabletStatement>> databaseInsertTabletStatementMap) {
for (final Map.Entry<String, List<InsertTabletStatement>> insertTablets :
databaseInsertTabletStatementMap.entrySet()) {
final InsertMultiTabletsStatement statement = new InsertMultiTabletsStatement();
statement.setInsertTabletStatementList(insertTablets.getValue());
if (insertTablets.getKey() != null) {
statement.setDatabaseName(insertTablets.getKey());
}
statements.add(statement);
}
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
Expand Down Expand Up @@ -72,14 +73,18 @@ public InsertBaseStatement constructStatement() {
return statement;
}

// Table model
statement.setWriteToTable(true);
final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
if (isTableModel) {
statement.setWriteToTable(true);
}
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
insertRowStatement.setWriteToTable(true);
if (isTableModel) {
insertRowStatement.setWriteToTable(true);
}
insertRowStatement.setDatabaseName(dataBaseName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
Expand Down Expand Up @@ -72,14 +73,18 @@ public InsertBaseStatement constructStatement() {
return statement;
}

// Table model
statement.setWriteToTable(true);
final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
if (isTableModel) {
statement.setWriteToTable(true);
}
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
insertRowStatement.setWriteToTable(true);
if (isTableModel) {
insertRowStatement.setWriteToTable(true);
}
insertRowStatement.setDatabaseName(dataBaseName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
Expand Down Expand Up @@ -54,29 +55,45 @@ public String getDataBaseName() {

@Override
public InsertTabletStatement constructStatement() {
final boolean isTableModel =
Objects.nonNull(dataBaseName) && PathUtils.isTableModelDatabase(dataBaseName);

if (statement != null) {
if (Objects.isNull(dataBaseName)) {
new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
} else {
if (isTableModel) {
new PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
statement.setWriteToTable(true);
} else {
new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
}
if (Objects.nonNull(dataBaseName)) {
statement.setDatabaseName(dataBaseName);
}

return statement;
}

if (Objects.isNull(dataBaseName)) {
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
} else {
if (isTableModel) {
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
} else {
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
}

try {
if (isTabletEmpty(tablet)) {
// Empty statement, will be filtered after construction
return new InsertTabletStatement();
statement = new InsertTabletStatement();
return statement;
}

return new InsertTabletStatement(tablet, isAligned, dataBaseName);
if (isTableModel) {
statement = new InsertTabletStatement(tablet, isAligned, dataBaseName);
} else {
statement = new InsertTabletStatement(tablet, isAligned, null);
if (Objects.nonNull(dataBaseName)) {
statement.setDatabaseName(dataBaseName);
}
}
return statement;
} catch (final MetadataException e) {
LOGGER.warn(DataNodePipeMessages.GENERATE_STATEMENT_FROM_TABLET_ERROR, tablet, e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void doTransfer(
insertNode,
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: null);
: pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName());

if (!send(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
Expand Down Expand Up @@ -290,7 +290,7 @@ private void doTransfer(
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: null))) {
: pipeRawTabletInsertionEvent.getTreeModelDatabaseName()))) {
final String errorMessage =
String.format(
"Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private boolean transferInEventWithoutCheck(final TabletInsertionEvent tabletIns
final String databaseName =
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: null;
: pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
final TPipeTransferReq pipeTransferReq =
compressIfNeeded(
PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, databaseName));
Expand All @@ -327,7 +327,7 @@ private boolean transferInEventWithoutCheck(final TabletInsertionEvent tabletIns
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: null));
: pipeRawTabletInsertionEvent.getTreeModelDatabaseName()));
final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
new PipeTransferTabletRawEventHandler(
pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private void doTransfer(
insertNode,
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: null));
: pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()));
rateLimitIfNeeded(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
pipeInsertNodeTabletInsertionEvent.getCreationTime(),
Expand Down Expand Up @@ -452,7 +452,7 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: null));
: pipeRawTabletInsertionEvent.getTreeModelDatabaseName()));
rateLimitIfNeeded(
pipeRawTabletInsertionEvent.getPipeName(),
pipeRawTabletInsertionEvent.getCreationTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public class WriteBackSink implements PipeConnector {

private UserEntity userEntity;

private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;

private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();

private static final Set<String> ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet();
Expand Down Expand Up @@ -268,7 +266,7 @@ private void doTransfer(
final String dataBaseName =
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;
: pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();

final InsertBaseStatement insertBaseStatement;
insertBaseStatement =
Expand Down Expand Up @@ -311,7 +309,7 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion
final String dataBaseName =
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;
: pipeRawTabletInsertionEvent.getTreeModelDatabaseName();

final InsertTabletStatement insertTabletStatement =
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
Expand Down
Loading
Loading