Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
Expand Down Expand Up @@ -99,12 +100,12 @@ public void testTableSync() throws Exception {
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
"create table table1(a id, b attribute, c int32) with (ttl=3000)",
"create table table1(a tag, b attribute, c int32) with (ttl=3000)",
"alter table table1 add column d int64",
"alter table table1 drop column c",
"alter table table1 set properties ttl=default",
"insert into table1 (a, b, d) values(1, 1, 1)",
"create table noTransferTable(a id, b attribute, c int32) with (ttl=3000)"),
"create table noTransferTable(a tag, b attribute, c int32) with (ttl=3000)"),
null)) {
return;
}
Expand All @@ -126,7 +127,7 @@ public void testTableSync() throws Exception {
dbName,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
"insert into table1 (a, b) values(1, 2)",
"insert into table1 (a, b, d) values(1, 2, 1)",
null)) {
return;
}
Expand All @@ -148,7 +149,7 @@ public void testTableSync() throws Exception {
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from table1", "a,b,d,", Collections.emptySet(), dbName);
receiverEnv, "select * from table1", "time,a,b,d,", Collections.emptySet(), dbName);

if (!TestUtils.tryExecuteNonQueryWithRetry(
dbName,
Expand Down Expand Up @@ -177,9 +178,9 @@ public void testTableSync() throws Exception {
new HashSet<>(
Arrays.asList(
"time,TIMESTAMP,TIME,",
"a,STRING,ID,",
"a,STRING,TAG,",
"b,STRING,ATTRIBUTE,",
"d,INT64,MEASUREMENT,")),
"d,INT64,FIELD,")),
dbName);

if (!TestUtils.tryExecuteNonQueryWithRetry(
Expand Down Expand Up @@ -300,7 +301,7 @@ public void testNoTable() throws Exception {
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
"create table table1(a id, b attribute, c int32) with (ttl=3000)",
"create table table1(a tag, b attribute, c int32) with (ttl=3000)",
"alter table table1 add column d int64",
"alter table table1 drop column b",
"alter table table1 set properties ttl=default"),
Expand Down Expand Up @@ -334,9 +335,8 @@ public void testAuth() throws Exception {
return;
}

final String dbName = "test";
if (!TestUtils.tryExecuteNonQueriesWithRetry(
dbName,
null,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
Expand Down Expand Up @@ -370,10 +370,11 @@ public void testAuth() throws Exception {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipeExtended(new TStartPipeReq("testPipe").setIsTableModel(true)).getCode());

if (!TestUtils.tryExecuteNonQueryWithRetry(
dbName,
null,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
"grant alter on any to user testUser with grant option",
Expand All @@ -392,7 +393,7 @@ public void testAuth() throws Exception {
",,MAINTAIN,false,",
",*.*,ALTER,true,",
",test.*,DROP,false,")),
dbName);
(String) null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ public Update visitTableDeviceAttributeUpdate(

@Override
public Delete visitDeleteData(final RelationalDeleteDataNode node, final Void context) {
final Delete statement = new Delete();
final Delete statement =
new Delete(
new Table(
QualifiedName.of(
node.getDatabaseName(), node.getModEntries().get(0).getTableName())));
statement.setDatabaseName(node.getDatabaseName());
statement.setTableDeletionEntries(node.getModEntries());
return statement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.source.IoTDBSource;
import org.apache.iotdb.consensus.ConsensusFactory;
Expand Down Expand Up @@ -237,11 +236,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);

// Validate tree pattern and table pattern
final TreePattern treePattern =
TreePattern.parsePipePatternFromSourceParameters(validator.getParameters());
final TablePattern tablePattern =
TablePattern.parsePipePatternFromSourceParameters(validator.getParameters());
validatePattern(treePattern, tablePattern);
validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()));

// Validate extractor.history.enable and extractor.realtime.enable
validator
Expand Down Expand Up @@ -302,7 +297,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
realtimeExtractor.validate(validator);
}

private void validatePattern(final TreePattern treePattern, final TablePattern tablePattern) {
private void validatePattern(final TreePattern treePattern) {
if (!treePattern.isLegal()) {
throw new IllegalArgumentException(String.format("Pattern \"%s\" is illegal.", treePattern));
}
Expand All @@ -316,13 +311,6 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t
"The path pattern %s is not valid for the source. Only prefix or full path is allowed.",
treePattern));
}

if (shouldExtractDeletion && tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) {
throw new IllegalArgumentException(
String.format(
"The table model pattern %s can not be specified when deletion capture is enabled.",
tablePattern));
}
}

private void checkInvalidParameters(final PipeParameterValidator validator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeDataRegionAssigner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

import java.util.Objects;
Expand Down Expand Up @@ -141,7 +142,9 @@ public DeletionResource listenToDeleteData(
final String regionId, final AbstractDeleteDataNode node) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId);
// only events from registered data region will be extracted
if (assigner == null) {
if (assigner == null
|| node instanceof RelationalDeleteDataNode
&& ((RelationalDeleteDataNode) node).getModEntries().isEmpty()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;

Expand Down Expand Up @@ -114,19 +113,19 @@ public PlanNodeType getType() {

@Override
public PlanNode clone() {
return new PipeEnrichedDeleteDataNode((DeleteDataNode) deleteDataNode.clone());
return new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) deleteDataNode.clone());
}

@Override
public PlanNode createSubNode(final int subNodeId, final int startIndex, final int endIndex) {
return new PipeEnrichedDeleteDataNode(
(DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex));
(AbstractDeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex));
}

@Override
public PlanNode cloneWithChildren(final List<PlanNode> children) {
return new PipeEnrichedDeleteDataNode(
(DeleteDataNode) deleteDataNode.cloneWithChildren(children));
(AbstractDeleteDataNode) deleteDataNode.cloneWithChildren(children));
}

@Override
Expand Down Expand Up @@ -157,7 +156,8 @@ protected void serializeAttributes(final DataOutputStream stream) throws IOExcep
}

public static PipeEnrichedDeleteDataNode deserialize(final ByteBuffer buffer) {
return new PipeEnrichedDeleteDataNode((DeleteDataNode) PlanNodeType.deserialize(buffer));
return new PipeEnrichedDeleteDataNode(
(AbstractDeleteDataNode) PlanNodeType.deserialize(buffer));
}

@Override
Expand All @@ -183,7 +183,7 @@ public List<WritePlanNode> splitByPartition(final IAnalysis analysis) {
plan ->
plan instanceof PipeEnrichedDeleteDataNode
? plan
: new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
: new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) plan))
.collect(Collectors.toList());
}

Expand All @@ -206,6 +206,6 @@ public SearchNode merge(List<SearchNode> searchNodes) {
(SearchNode) ((PipeEnrichedDeleteDataNode) searchNode).getDeleteDataNode())
.collect(Collectors.toList());
return new PipeEnrichedDeleteDataNode(
(DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
(AbstractDeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public class Delete extends Statement {
private String databaseName;
private Collection<TRegionReplicaSet> replicaSets;

public Delete() {
public Delete(final Table table) {
super(null);
this.table = requireNonNull(table, "table is null");
}

public Delete(final NodeLocation location, final Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public class PipeCommitInterval extends Interval<PipeCommitInterval> {
private final PipeTaskMeta pipeTaskMeta;

public PipeCommitInterval(
final long s,
final long e,
final long start,
final long end,
final ProgressIndex currentIndex,
final List<Runnable> onCommittedHooks,
final PipeTaskMeta pipeTaskMeta) {
super(s, e);
super(start, end);
this.pipeTaskMeta = pipeTaskMeta;
this.currentIndex = currentIndex;
this.onCommittedHooks = onCommittedHooks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public class Interval<T extends Interval<T>> implements Comparable<Interval<?>>
public long start;
public long end;

public Interval(final long s, final long e) {
start = s;
end = e;
public Interval(final long start, final long end) {
this.start = start;
this.end = end;
}

public void onMerged(final T another) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,38 @@

package org.apache.iotdb.commons.pipe.datastructure.interval;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.TreeSet;

@NotThreadSafe
public class IntervalManager<T extends Interval<T>> {
private final TreeSet<T> intervals = new TreeSet<>();

// insert into new interval and merge
public void addInterval(final T newInterval) {
// Left closest
final T left = intervals.floor(newInterval);
T left = intervals.floor(newInterval);

// Right closest
final T right = intervals.ceiling(newInterval);
T right = intervals.ceiling(newInterval);

// Merge left ([0,1] + [2,3] → [0,3])
if (left != null && left.end >= newInterval.start - 1) {
while (left != null && left.end >= newInterval.start - 1) {
newInterval.start = Math.min(left.start, newInterval.start);
newInterval.end = Math.max(left.end, newInterval.end);
newInterval.onMerged(left);
intervals.remove(left);
left = intervals.floor(newInterval);
}

// Merge right ([2,3] + [3,4] → [2,4])
if (right != null && newInterval.end >= right.start - 1) {
while (right != null && newInterval.end >= right.start - 1) {
newInterval.start = Math.min(newInterval.start, right.start);
newInterval.end = Math.max(newInterval.end, right.end);
newInterval.onMerged(right);
intervals.remove(right);
right = intervals.ceiling(newInterval);
}

intervals.add(newInterval);
Expand Down
Loading