Skip to content

Commit 54ca825

Browse files
authored
Pipe: Fixed the table deletion transfer logic & pipe meta IT & interval manager in general model (#16268)
* ft * remove ancient check * refactor * generic-fix * Update IntervalManager.java * fix * ancient-bug * push
1 parent 922330e commit 54ca825

9 files changed

Lines changed: 48 additions & 46 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
2424
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
25+
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
2526
import org.apache.iotdb.db.it.utils.TestUtils;
2627
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2728
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -99,12 +100,12 @@ public void testTableSync() throws Exception {
99100
BaseEnv.TABLE_SQL_DIALECT,
100101
senderEnv,
101102
Arrays.asList(
102-
"create table table1(a id, b attribute, c int32) with (ttl=3000)",
103+
"create table table1(a tag, b attribute, c int32) with (ttl=3000)",
103104
"alter table table1 add column d int64",
104105
"alter table table1 drop column c",
105106
"alter table table1 set properties ttl=default",
106107
"insert into table1 (a, b, d) values(1, 1, 1)",
107-
"create table noTransferTable(a id, b attribute, c int32) with (ttl=3000)"),
108+
"create table noTransferTable(a tag, b attribute, c int32) with (ttl=3000)"),
108109
null)) {
109110
return;
110111
}
@@ -126,7 +127,7 @@ public void testTableSync() throws Exception {
126127
dbName,
127128
BaseEnv.TABLE_SQL_DIALECT,
128129
senderEnv,
129-
"insert into table1 (a, b) values(1, 2)",
130+
"insert into table1 (a, b, d) values(1, 2, 1)",
130131
null)) {
131132
return;
132133
}
@@ -148,7 +149,7 @@ public void testTableSync() throws Exception {
148149
}
149150

150151
TestUtils.assertDataEventuallyOnEnv(
151-
receiverEnv, "select * from table1", "a,b,d,", Collections.emptySet(), dbName);
152+
receiverEnv, "select * from table1", "time,a,b,d,", Collections.emptySet(), dbName);
152153

153154
if (!TestUtils.tryExecuteNonQueryWithRetry(
154155
dbName,
@@ -177,9 +178,9 @@ public void testTableSync() throws Exception {
177178
new HashSet<>(
178179
Arrays.asList(
179180
"time,TIMESTAMP,TIME,",
180-
"a,STRING,ID,",
181+
"a,STRING,TAG,",
181182
"b,STRING,ATTRIBUTE,",
182-
"d,INT64,MEASUREMENT,")),
183+
"d,INT64,FIELD,")),
183184
dbName);
184185

185186
if (!TestUtils.tryExecuteNonQueryWithRetry(
@@ -300,7 +301,7 @@ public void testNoTable() throws Exception {
300301
BaseEnv.TABLE_SQL_DIALECT,
301302
senderEnv,
302303
Arrays.asList(
303-
"create table table1(a id, b attribute, c int32) with (ttl=3000)",
304+
"create table table1(a tag, b attribute, c int32) with (ttl=3000)",
304305
"alter table table1 add column d int64",
305306
"alter table table1 drop column b",
306307
"alter table table1 set properties ttl=default"),
@@ -334,9 +335,8 @@ public void testAuth() throws Exception {
334335
return;
335336
}
336337

337-
final String dbName = "test";
338338
if (!TestUtils.tryExecuteNonQueriesWithRetry(
339-
dbName,
339+
null,
340340
BaseEnv.TABLE_SQL_DIALECT,
341341
senderEnv,
342342
Arrays.asList(
@@ -370,10 +370,11 @@ public void testAuth() throws Exception {
370370
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
371371

372372
Assert.assertEquals(
373-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
373+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
374+
client.startPipeExtended(new TStartPipeReq("testPipe").setIsTableModel(true)).getCode());
374375

375376
if (!TestUtils.tryExecuteNonQueryWithRetry(
376-
dbName,
377+
null,
377378
BaseEnv.TABLE_SQL_DIALECT,
378379
senderEnv,
379380
"grant alter on any to user testUser with grant option",
@@ -392,7 +393,7 @@ public void testAuth() throws Exception {
392393
",,MAINTAIN,false,",
393394
",*.*,ALTER,true,",
394395
",test.*,DROP,false,")),
395-
dbName);
396+
(String) null);
396397
}
397398
}
398399

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,11 @@ public Update visitTableDeviceAttributeUpdate(
315315

316316
@Override
317317
public Delete visitDeleteData(final RelationalDeleteDataNode node, final Void context) {
318-
final Delete statement = new Delete();
318+
final Delete statement =
319+
new Delete(
320+
new Table(
321+
QualifiedName.of(
322+
node.getDatabaseName(), node.getModEntries().get(0).getTableName())));
319323
statement.setDatabaseName(node.getDatabaseName());
320324
statement.setTableDeletionEntries(node.getModEntries());
321325
return statement;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
2424
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
2525
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
26-
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
2726
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
2827
import org.apache.iotdb.commons.pipe.source.IoTDBSource;
2928
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -237,11 +236,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
237236
EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
238237

239238
// Validate tree pattern and table pattern
240-
final TreePattern treePattern =
241-
TreePattern.parsePipePatternFromSourceParameters(validator.getParameters());
242-
final TablePattern tablePattern =
243-
TablePattern.parsePipePatternFromSourceParameters(validator.getParameters());
244-
validatePattern(treePattern, tablePattern);
239+
validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()));
245240

246241
// Validate extractor.history.enable and extractor.realtime.enable
247242
validator
@@ -302,7 +297,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
302297
realtimeExtractor.validate(validator);
303298
}
304299

305-
private void validatePattern(final TreePattern treePattern, final TablePattern tablePattern) {
300+
private void validatePattern(final TreePattern treePattern) {
306301
if (!treePattern.isLegal()) {
307302
throw new IllegalArgumentException(String.format("Pattern \"%s\" is illegal.", treePattern));
308303
}
@@ -316,13 +311,6 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t
316311
"The path pattern %s is not valid for the source. Only prefix or full path is allowed.",
317312
treePattern));
318313
}
319-
320-
if (shouldExtractDeletion && tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) {
321-
throw new IllegalArgumentException(
322-
String.format(
323-
"The table model pattern %s can not be specified when deletion capture is enabled.",
324-
tablePattern));
325-
}
326314
}
327315

328316
private void checkInvalidParameters(final PipeParameterValidator validator) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeDataRegionAssigner;
2929
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
3030
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
31+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
3132
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3233

3334
import java.util.Objects;
@@ -141,7 +142,9 @@ public DeletionResource listenToDeleteData(
141142
final String regionId, final AbstractDeleteDataNode node) {
142143
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId);
143144
// only events from registered data region will be extracted
144-
if (assigner == null) {
145+
if (assigner == null
146+
|| node instanceof RelationalDeleteDataNode
147+
&& ((RelationalDeleteDataNode) node).getModEntries().isEmpty()) {
145148
return null;
146149
}
147150

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
3131
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
3232
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
33-
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
3433
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
3534
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
3635

@@ -114,19 +113,19 @@ public PlanNodeType getType() {
114113

115114
@Override
116115
public PlanNode clone() {
117-
return new PipeEnrichedDeleteDataNode((DeleteDataNode) deleteDataNode.clone());
116+
return new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) deleteDataNode.clone());
118117
}
119118

120119
@Override
121120
public PlanNode createSubNode(final int subNodeId, final int startIndex, final int endIndex) {
122121
return new PipeEnrichedDeleteDataNode(
123-
(DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex));
122+
(AbstractDeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex));
124123
}
125124

126125
@Override
127126
public PlanNode cloneWithChildren(final List<PlanNode> children) {
128127
return new PipeEnrichedDeleteDataNode(
129-
(DeleteDataNode) deleteDataNode.cloneWithChildren(children));
128+
(AbstractDeleteDataNode) deleteDataNode.cloneWithChildren(children));
130129
}
131130

132131
@Override
@@ -157,7 +156,8 @@ protected void serializeAttributes(final DataOutputStream stream) throws IOExcep
157156
}
158157

159158
public static PipeEnrichedDeleteDataNode deserialize(final ByteBuffer buffer) {
160-
return new PipeEnrichedDeleteDataNode((DeleteDataNode) PlanNodeType.deserialize(buffer));
159+
return new PipeEnrichedDeleteDataNode(
160+
(AbstractDeleteDataNode) PlanNodeType.deserialize(buffer));
161161
}
162162

163163
@Override
@@ -183,7 +183,7 @@ public List<WritePlanNode> splitByPartition(final IAnalysis analysis) {
183183
plan ->
184184
plan instanceof PipeEnrichedDeleteDataNode
185185
? plan
186-
: new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
186+
: new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) plan))
187187
.collect(Collectors.toList());
188188
}
189189

@@ -206,6 +206,6 @@ public SearchNode merge(List<SearchNode> searchNodes) {
206206
(SearchNode) ((PipeEnrichedDeleteDataNode) searchNode).getDeleteDataNode())
207207
.collect(Collectors.toList());
208208
return new PipeEnrichedDeleteDataNode(
209-
(DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
209+
(AbstractDeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
210210
}
211211
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ public class Delete extends Statement {
4444
private String databaseName;
4545
private Collection<TRegionReplicaSet> replicaSets;
4646

47-
public Delete() {
47+
public Delete(final Table table) {
4848
super(null);
49+
this.table = requireNonNull(table, "table is null");
4950
}
5051

5152
public Delete(final NodeLocation location, final Table table) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ public class PipeCommitInterval extends Interval<PipeCommitInterval> {
3333
private final PipeTaskMeta pipeTaskMeta;
3434

3535
public PipeCommitInterval(
36-
final long s,
37-
final long e,
36+
final long start,
37+
final long end,
3838
final ProgressIndex currentIndex,
3939
final List<Runnable> onCommittedHooks,
4040
final PipeTaskMeta pipeTaskMeta) {
41-
super(s, e);
41+
super(start, end);
4242
this.pipeTaskMeta = pipeTaskMeta;
4343
this.currentIndex = currentIndex;
4444
this.onCommittedHooks = onCommittedHooks;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ public class Interval<T extends Interval<T>> implements Comparable<Interval<?>>
2323
public long start;
2424
public long end;
2525

26-
public Interval(final long s, final long e) {
27-
start = s;
28-
end = e;
26+
public Interval(final long start, final long end) {
27+
this.start = start;
28+
this.end = end;
2929
}
3030

3131
public void onMerged(final T another) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,38 @@
1919

2020
package org.apache.iotdb.commons.pipe.datastructure.interval;
2121

22+
import javax.annotation.concurrent.NotThreadSafe;
23+
2224
import java.util.TreeSet;
2325

26+
@NotThreadSafe
2427
public class IntervalManager<T extends Interval<T>> {
2528
private final TreeSet<T> intervals = new TreeSet<>();
2629

2730
// insert into new interval and merge
2831
public void addInterval(final T newInterval) {
2932
// Left closest
30-
final T left = intervals.floor(newInterval);
33+
T left = intervals.floor(newInterval);
3134

3235
// Right closest
33-
final T right = intervals.ceiling(newInterval);
36+
T right = intervals.ceiling(newInterval);
3437

3538
// Merge left ([0,1] + [2,3] → [0,3])
36-
if (left != null && left.end >= newInterval.start - 1) {
39+
while (left != null && left.end >= newInterval.start - 1) {
3740
newInterval.start = Math.min(left.start, newInterval.start);
3841
newInterval.end = Math.max(left.end, newInterval.end);
3942
newInterval.onMerged(left);
4043
intervals.remove(left);
44+
left = intervals.floor(newInterval);
4145
}
4246

4347
// Merge right ([2,3] + [3,4] → [2,4])
44-
if (right != null && newInterval.end >= right.start - 1) {
48+
while (right != null && newInterval.end >= right.start - 1) {
4549
newInterval.start = Math.min(newInterval.start, right.start);
4650
newInterval.end = Math.max(newInterval.end, right.end);
4751
newInterval.onMerged(right);
4852
intervals.remove(right);
53+
right = intervals.ceiling(newInterval);
4954
}
5055

5156
intervals.add(newInterval);

0 commit comments

Comments
 (0)