diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java index 355c36db2aa3e..b9cf79c34ef23 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -99,12 +100,12 @@ public void testTableSync() throws Exception { BaseEnv.TABLE_SQL_DIALECT, senderEnv, Arrays.asList( - "create table table1(a id, b attribute, c int32) with (ttl=3000)", + "create table table1(a tag, b attribute, c int32) with (ttl=3000)", "alter table table1 add column d int64", "alter table table1 drop column c", "alter table table1 set properties ttl=default", "insert into table1 (a, b, d) values(1, 1, 1)", - "create table noTransferTable(a id, b attribute, c int32) with (ttl=3000)"), + "create table noTransferTable(a tag, b attribute, c int32) with (ttl=3000)"), null)) { return; } @@ -126,7 +127,7 @@ public void testTableSync() throws Exception { dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, - "insert into table1 (a, b) values(1, 2)", + "insert into table1 (a, b, d) values(1, 2, 1)", null)) { return; } @@ -148,7 +149,7 @@ public void testTableSync() throws Exception { } TestUtils.assertDataEventuallyOnEnv( - receiverEnv, "select * from table1", "a,b,d,", Collections.emptySet(), dbName); + receiverEnv, "select * from table1", "time,a,b,d,", Collections.emptySet(), dbName); if (!TestUtils.tryExecuteNonQueryWithRetry( dbName, @@ -177,9 +178,9 @@ public void testTableSync() throws Exception { new HashSet<>( Arrays.asList( "time,TIMESTAMP,TIME,", - "a,STRING,ID,", + "a,STRING,TAG,", "b,STRING,ATTRIBUTE,", - "d,INT64,MEASUREMENT,")), + "d,INT64,FIELD,")), dbName); if (!TestUtils.tryExecuteNonQueryWithRetry( @@ -300,7 +301,7 @@ public void testNoTable() throws Exception { BaseEnv.TABLE_SQL_DIALECT, senderEnv, Arrays.asList( - "create table table1(a id, b attribute, c int32) with (ttl=3000)", + "create table table1(a tag, b attribute, c int32) with (ttl=3000)", "alter table table1 add column d int64", "alter table table1 drop column b", "alter table table1 set properties ttl=default"), @@ -334,9 +335,8 @@ public void testAuth() throws Exception { return; } - final String dbName = "test"; if (!TestUtils.tryExecuteNonQueriesWithRetry( - dbName, + null, BaseEnv.TABLE_SQL_DIALECT, senderEnv, Arrays.asList( @@ -370,10 +370,11 @@ public void testAuth() throws Exception { Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client.startPipeExtended(new TStartPipeReq("testPipe").setIsTableModel(true)).getCode()); if (!TestUtils.tryExecuteNonQueryWithRetry( - dbName, + null, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "grant alter on any to user testUser with grant option", @@ -392,7 +393,7 @@ public void testAuth() throws Exception { ",,MAINTAIN,false,", ",*.*,ALTER,true,", ",test.*,DROP,false,")), - dbName); + (String) null); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java index bd2852a14b3b7..fc073226043ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java @@ -315,7 +315,11 @@ public Update visitTableDeviceAttributeUpdate( @Override public Delete visitDeleteData(final RelationalDeleteDataNode node, final Void context) { - final Delete statement = new Delete(); + final Delete statement = + new Delete( + new Table( + QualifiedName.of( + node.getDatabaseName(), node.getModEntries().get(0).getTableName()))); statement.setDatabaseName(node.getDatabaseName()); statement.setTableDeletionEntries(node.getModEntries()); return statement; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index 72dc0b5175461..4dc5fef859ffc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; -import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.source.IoTDBSource; import org.apache.iotdb.consensus.ConsensusFactory; @@ -237,11 +236,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE); // Validate tree pattern and table pattern - final TreePattern treePattern = - TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()); - final TablePattern tablePattern = - TablePattern.parsePipePatternFromSourceParameters(validator.getParameters()); - validatePattern(treePattern, tablePattern); + validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters())); // Validate extractor.history.enable and extractor.realtime.enable validator @@ -302,7 +297,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { realtimeExtractor.validate(validator); } - private void validatePattern(final TreePattern treePattern, final TablePattern tablePattern) { + private void validatePattern(final TreePattern treePattern) { if (!treePattern.isLegal()) { throw new IllegalArgumentException(String.format("Pattern \"%s\" is illegal.", treePattern)); } @@ -316,13 +311,6 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t "The path pattern %s is not valid for the source. Only prefix or full path is allowed.", treePattern)); } - - if (shouldExtractDeletion && tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) { - throw new IllegalArgumentException( - String.format( - "The table model pattern %s can not be specified when deletion capture is enabled.", - tablePattern)); - } } private void checkInvalidParameters(final PipeParameterValidator validator) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index dff966e72e895..8c49dd0a3dd8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeDataRegionAssigner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.util.Objects; @@ -141,7 +142,9 @@ public DeletionResource listenToDeleteData( final String regionId, final AbstractDeleteDataNode node) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId); // only events from registered data region will be extracted - if (assigner == null) { + if (assigner == null + || node instanceof RelationalDeleteDataNode + && ((RelationalDeleteDataNode) node).getModEntries().isEmpty()) { return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java index 973e71e97b5ac..11d70e0daa755 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; @@ -114,19 +113,19 @@ public PlanNodeType getType() { @Override public PlanNode clone() { - return new PipeEnrichedDeleteDataNode((DeleteDataNode) deleteDataNode.clone()); + return new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) deleteDataNode.clone()); } @Override public PlanNode createSubNode(final int subNodeId, final int startIndex, final int endIndex) { return new PipeEnrichedDeleteDataNode( - (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex)); + (AbstractDeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex)); } @Override public PlanNode cloneWithChildren(final List children) { return new PipeEnrichedDeleteDataNode( - (DeleteDataNode) deleteDataNode.cloneWithChildren(children)); + (AbstractDeleteDataNode) deleteDataNode.cloneWithChildren(children)); } @Override @@ -157,7 +156,8 @@ protected void serializeAttributes(final DataOutputStream stream) throws IOExcep } public static PipeEnrichedDeleteDataNode deserialize(final ByteBuffer buffer) { - return new PipeEnrichedDeleteDataNode((DeleteDataNode) PlanNodeType.deserialize(buffer)); + return new PipeEnrichedDeleteDataNode( + (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer)); } @Override @@ -183,7 +183,7 @@ public List splitByPartition(final IAnalysis analysis) { plan -> plan instanceof PipeEnrichedDeleteDataNode ? plan - : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan)) + : new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) plan)) .collect(Collectors.toList()); } @@ -206,6 +206,6 @@ public SearchNode merge(List searchNodes) { (SearchNode) ((PipeEnrichedDeleteDataNode) searchNode).getDeleteDataNode()) .collect(Collectors.toList()); return new PipeEnrichedDeleteDataNode( - (DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes)); + (AbstractDeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java index 72b8a8655892c..788b5ddffcd07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java @@ -44,8 +44,9 @@ public class Delete extends Statement { private String databaseName; private Collection replicaSets; - public Delete() { + public Delete(final Table table) { super(null); + this.table = requireNonNull(table, "table is null"); } public Delete(final NodeLocation location, final Table table) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java index 7ee8008c0e01f..46a3d3e2a865c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java @@ -33,12 +33,12 @@ public class PipeCommitInterval extends Interval { private final PipeTaskMeta pipeTaskMeta; public PipeCommitInterval( - final long s, - final long e, + final long start, + final long end, final ProgressIndex currentIndex, final List onCommittedHooks, final PipeTaskMeta pipeTaskMeta) { - super(s, e); + super(start, end); this.pipeTaskMeta = pipeTaskMeta; this.currentIndex = currentIndex; this.onCommittedHooks = onCommittedHooks; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java index 67ecaa66d6e66..45b51e3ee29a2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java @@ -23,9 +23,9 @@ public class Interval> implements Comparable> public long start; public long end; - public Interval(final long s, final long e) { - start = s; - end = e; + public Interval(final long start, final long end) { + this.start = start; + this.end = end; } public void onMerged(final T another) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java index 179a379b7c3a7..cecdef312de33 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java @@ -19,33 +19,38 @@ package org.apache.iotdb.commons.pipe.datastructure.interval; +import javax.annotation.concurrent.NotThreadSafe; + import java.util.TreeSet; +@NotThreadSafe public class IntervalManager> { private final TreeSet intervals = new TreeSet<>(); // insert into new interval and merge public void addInterval(final T newInterval) { // Left closest - final T left = intervals.floor(newInterval); + T left = intervals.floor(newInterval); // Right closest - final T right = intervals.ceiling(newInterval); + T right = intervals.ceiling(newInterval); // Merge left ([0,1] + [2,3] → [0,3]) - if (left != null && left.end >= newInterval.start - 1) { + while (left != null && left.end >= newInterval.start - 1) { newInterval.start = Math.min(left.start, newInterval.start); newInterval.end = Math.max(left.end, newInterval.end); newInterval.onMerged(left); intervals.remove(left); + left = intervals.floor(newInterval); } // Merge right ([2,3] + [3,4] → [2,4]) - if (right != null && newInterval.end >= right.start - 1) { + while (right != null && newInterval.end >= right.start - 1) { newInterval.start = Math.min(newInterval.start, right.start); newInterval.end = Math.max(newInterval.end, right.end); newInterval.onMerged(right); intervals.remove(right); + right = intervals.ceiling(newInterval); } intervals.add(newInterval);